package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
import com.google.common.collect.HashMultimap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/OffsetManagerTest.class */
public class OffsetManagerTest extends KafkaOperatorTestBase {
    static final int totalCount = 100;
    static CountDownLatch latch;
    static final String OFFSET_FILE = ".offset";
    static final Logger logger = LoggerFactory.getLogger(KafkaPartitionableInputOperatorTest.class);
    static List<String> collectedTuples = new LinkedList();
    static long initialPos = 10;
    static Path baseFolder = new Path("target");

    /* loaded from: input_file:com/datatorrent/contrib/kafka/OffsetManagerTest$CollectorInputPort.class */
    public static class CollectorInputPort extends DefaultInputPort<String> {
        public CollectorInputPort(Operator operator) {
        }

        public void process(String str) {
            if (!str.equals("END_TUPLE")) {
                OffsetManagerTest.collectedTuples.add(str);
            } else if (OffsetManagerTest.latch != null) {
                OffsetManagerTest.latch.countDown();
            }
        }

        public void setConnected(boolean z) {
            if (z) {
                OffsetManagerTest.collectedTuples.clear();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/OffsetManagerTest$CollectorModule.class */
    public static class CollectorModule extends BaseOperator {
        public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/OffsetManagerTest$TestOffsetManager.class */
    public static class TestOffsetManager implements OffsetManager {
        private final transient Map<KafkaPartition, Long> offsets = Collections.synchronizedMap(new HashMap());
        private String filename = null;
        private transient FileSystem fs = FileSystem.get(new Configuration());
        private transient FileContext fc = FileContext.getFileContext(this.fs.getUri());

        public Map<KafkaPartition, Long> loadInitialOffsets() {
            KafkaPartition kafkaPartition = new KafkaPartition(KafkaOperatorTestBase.TEST_TOPIC, 0);
            KafkaPartition kafkaPartition2 = new KafkaPartition(KafkaOperatorTestBase.TEST_TOPIC, 1);
            this.offsets.put(kafkaPartition, Long.valueOf(OffsetManagerTest.initialPos));
            this.offsets.put(kafkaPartition2, Long.valueOf(OffsetManagerTest.initialPos));
            return this.offsets;
        }

        public void updateOffsets(Map<KafkaPartition, Long> map) {
            this.offsets.putAll(map);
            try {
                Path path = new Path(OffsetManagerTest.baseFolder, this.filename + ".tmp");
                Path path2 = new Path(OffsetManagerTest.baseFolder, this.filename);
                FSDataOutputStream create = this.fs.create(path, true);
                for (Map.Entry<KafkaPartition, Long> entry : this.offsets.entrySet()) {
                    create.writeBytes(entry.getKey() + ", " + entry.getValue() + "\n");
                }
                create.close();
                this.fc.rename(path, path2, new Options.Rename[]{Options.Rename.OVERWRITE});
            } catch (Exception e) {
            }
            countdownLatch();
        }

        private void countdownLatch() {
            if (OffsetManagerTest.latch.getCount() == 1) {
                int i = 0;
                Iterator<Long> it = this.offsets.values().iterator();
                while (it.hasNext()) {
                    i = (int) (i + it.next().longValue());
                }
                if (i == 100) {
                    OffsetManagerTest.latch.countDown();
                }
            }
        }

        public void setFilename(String str) {
            this.filename = str;
        }

        public String getFilename() {
            return this.filename;
        }
    }

    public OffsetManagerTest() {
        this.hasMultiPartition = true;
    }

    @Test
    public void testSimpleConsumerUpdateOffsets() throws Exception {
        initialPos = 10L;
        try {
            testPartitionableInputOperator(new SimpleKafkaConsumer(), (100 - ((int) initialPos)) - ((int) initialPos));
            cleanFile();
        } catch (Throwable th) {
            cleanFile();
            throw th;
        }
    }

    @Test
    public void testSimpleConsumerInvalidInitialOffsets() throws Exception {
        initialPos = 1000L;
        try {
            testPartitionableInputOperator(new SimpleKafkaConsumer(), 100);
            cleanFile();
        } catch (Throwable th) {
            cleanFile();
            throw th;
        }
    }

    private void cleanFile() {
        try {
            FileSystem.get(new Configuration()).delete(new Path(baseFolder, "test_topic.offset"), true);
        } catch (IOException e) {
        }
    }

    public void testPartitionableInputOperator(KafkaConsumer kafkaConsumer, int i) throws Exception {
        latch = new CountDownLatch(3);
        KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(KafkaOperatorTestBase.TEST_TOPIC, true);
        kafkaTestProducer.setProducerType("sync");
        kafkaTestProducer.setSendCount(100);
        kafkaTestProducer.run();
        kafkaTestProducer.close();
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        KafkaSinglePortStringInputOperator addOperator = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
        TestOffsetManager testOffsetManager = new TestOffsetManager();
        testOffsetManager.setFilename("test_topic.offset");
        addOperator.setInitialPartitionCount(1);
        addOperator.setOffsetManager(testOffsetManager);
        addOperator.setStrategy(AbstractKafkaInputOperator.PartitionStrategy.ONE_TO_MANY.toString());
        addOperator.setRepartitionInterval(-1L);
        kafkaConsumer.setTopic(KafkaOperatorTestBase.TEST_TOPIC);
        HashMultimap.create();
        kafkaConsumer.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
        kafkaConsumer.setInitialOffset("earliest");
        addOperator.setConsumer(kafkaConsumer);
        dag.addStream("Kafka message", addOperator.outputPort, dag.addOperator("TestMessageCollector", new CollectorModule()).inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        Assert.assertTrue("TIMEOUT: 30s, collected " + collectedTuples.size() + " tuples", latch.await(30000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("Tuple count " + collectedTuples, i, collectedTuples.size());
        logger.debug(String.format("Number of emitted tuples: %d", Integer.valueOf(collectedTuples.size())));
        kafkaTestProducer.close();
        controller.shutdown();
    }
}
