package org.apache.flink.streaming.connectors.kafka;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.BitSet;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.apache.commons.lang.SerializationUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaITCase.class */
public class KafkaITCase {
    private static int zkPort;
    private static int kafkaPort;
    private static String kafkaHost;
    private static String zookeeperConnectionString;
    public static File tmpZkDir;
    public static File tmpKafkaDir;
    private static TestingServer zookeeper;
    private static KafkaServer broker1;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private static boolean partitionerHasBeenCalled = false;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaITCase$CustomPartitioner.class */
    private static class CustomPartitioner implements SerializableKafkaPartitioner {
        private CustomPartitioner() {
        }

        public int partition(Object obj, int i) {
            boolean unused = KafkaITCase.partitionerHasBeenCalled = true;
            Tuple2 tuple2 = (Tuple2) obj;
            if (((Long) tuple2.f0).longValue() < 10) {
                return 0;
            }
            return ((Long) tuple2.f0).longValue() < 20 ? 1 : 2;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaITCase$LocalSystemTime.class */
    public static class LocalSystemTime implements Time {
        public long milliseconds() {
            return System.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long j) {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                KafkaITCase.LOG.warn("Interruption", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaITCase$SuccessException.class */
    public static class SuccessException extends Exception {
        private static final long serialVersionUID = 1;
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaITCase$TupleSerializationSchema.class */
    private static class TupleSerializationSchema implements DeserializationSchema<Tuple2<Long, String>>, SerializationSchema<Tuple2<Long, String>, byte[]> {
        private TupleSerializationSchema() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Tuple2<Long, String> m1deserialize(byte[] bArr) {
            return (Tuple2) SerializationUtils.deserialize(bArr);
        }

        public byte[] serialize(Tuple2<Long, String> tuple2) {
            return SerializationUtils.serialize(tuple2);
        }

        public boolean isEndOfStream(Tuple2<Long, String> tuple2) {
            return false;
        }
    }

    @BeforeClass
    public static void prepare() throws IOException {
        LOG.info("Starting KafkaITCase.prepare()");
        tmpZkDir = tempFolder.newFolder();
        tmpKafkaDir = tempFolder.newFolder();
        kafkaHost = InetAddress.getLocalHost().getHostName();
        zkPort = NetUtils.getAvailablePort();
        kafkaPort = NetUtils.getAvailablePort();
        zookeeperConnectionString = "localhost:" + zkPort;
        zookeeper = null;
        broker1 = null;
        try {
            LOG.info("Starting Zookeeper");
            zookeeper = getZookeeper();
            LOG.info("Starting KafkaServer");
            broker1 = getKafkaServer(0);
            LOG.info("ZK and KafkaServer started.");
        } catch (Throwable th) {
            LOG.warn("Test failed with exception", th);
            Assert.fail("Test failed with: " + th.getMessage());
        }
    }

    @AfterClass
    public static void shutDownServices() {
        LOG.info("Shutting down all services");
        if (broker1 != null) {
            broker1.shutdown();
        }
        if (zookeeper != null) {
            try {
                zookeeper.stop();
            } catch (IOException e) {
                LOG.warn("ZK.stop() failed", e);
            }
        }
    }

    @Test
    public void regularKafkaSourceTest() throws Exception {
        LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
        createTestTopic("regularKafkaSourceTestTopic", 1);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.addSource(new KafkaSource(zookeeperConnectionString, "regularKafkaSourceTestTopic", "myFlinkGroup", new TupleSerializationSchema(), 5000L)).addSink(new SinkFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.1
            int elCnt = 0;
            int start = -1;
            BitSet validator = new BitSet(101);

            public void invoke(Tuple2<Long, String> tuple2) throws Exception {
                KafkaITCase.LOG.info("Got " + tuple2);
                int parseInt = Integer.parseInt(((String) tuple2.f1).split("-")[1]);
                Assert.assertEquals(((Long) tuple2.f0).longValue() - 1000, parseInt);
                if (this.start == -1) {
                    this.start = parseInt;
                }
                Assert.assertFalse("Received tuple twice", this.validator.get(parseInt - this.start));
                this.validator.set(parseInt - this.start);
                this.elCnt++;
                if (this.elCnt == 100) {
                    int nextClearBit = this.validator.nextClearBit(0);
                    if (nextClearBit == 100) {
                        throw new SuccessException();
                    }
                    throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nextClearBit + " Set: " + this.validator);
                }
            }
        });
        createLocalEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.2
            private static final long serialVersionUID = 1;
            boolean running = true;

            public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
                KafkaITCase.LOG.info("Starting source.");
                int i = 0;
                while (this.running) {
                    Long valueOf = Long.valueOf(1000 + i);
                    int i2 = i;
                    i++;
                    collector.collect(new Tuple2(valueOf, "kafka-" + i2));
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
            }

            public void cancel() {
                KafkaITCase.LOG.info("Source got cancel()");
                this.running = false;
            }
        }).addSink(new KafkaSink(zookeeperConnectionString, "regularKafkaSourceTestTopic", new TupleSerializationSchema()));
        try {
            createLocalEnvironment.setParallelism(1);
            createLocalEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            int i = 0;
            while (!(cause instanceof SuccessException)) {
                cause = cause.getCause();
                int i2 = i;
                i++;
                if (i2 == 20) {
                    LOG.warn("Test failed with exception", e);
                    Assert.fail("Test failed with: " + e.getMessage());
                }
            }
        }
        LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
    }

    @Test
    public void tupleTestTopology() throws Exception {
        LOG.info("Starting KafkaITCase.tupleTestTopology()");
        createTestTopic("tupleTestTopic", 1);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.addSource(new PersistentKafkaSource(zookeeperConnectionString, "tupleTestTopic", new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING)).addSink(new SinkFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.3
            int elCnt = 0;
            int start = -1;
            BitSet validator = new BitSet(101);

            public void invoke(Tuple2<Long, String> tuple2) throws Exception {
                KafkaITCase.LOG.info("Got " + tuple2);
                int parseInt = Integer.parseInt(((String) tuple2.f1).split("-")[1]);
                Assert.assertEquals(((Long) tuple2.f0).longValue() - 1000, parseInt);
                if (this.start == -1) {
                    this.start = parseInt;
                }
                Assert.assertFalse("Received tuple twice", this.validator.get(parseInt - this.start));
                this.validator.set(parseInt - this.start);
                this.elCnt++;
                if (this.elCnt == 100) {
                    int nextClearBit = this.validator.nextClearBit(0);
                    if (nextClearBit == 100) {
                        throw new SuccessException();
                    }
                    throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nextClearBit + " Set: " + this.validator);
                }
            }
        });
        createLocalEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.4
            private static final long serialVersionUID = 1;
            boolean running = true;

            public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
                KafkaITCase.LOG.info("Starting source.");
                int i = 0;
                while (this.running) {
                    Long valueOf = Long.valueOf(1000 + i);
                    int i2 = i;
                    i++;
                    collector.collect(new Tuple2(valueOf, "kafka-" + i2));
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
            }

            public void cancel() {
                KafkaITCase.LOG.info("Source got cancel()");
                this.running = false;
            }
        }).addSink(new KafkaSink(zookeeperConnectionString, "tupleTestTopic", new TupleSerializationSchema()));
        try {
            createLocalEnvironment.setParallelism(1);
            createLocalEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            int i = 0;
            while (!(cause instanceof SuccessException)) {
                cause = cause.getCause();
                int i2 = i;
                i++;
                if (i2 == 20) {
                    LOG.warn("Test failed with exception", e);
                    Assert.fail("Test failed with: " + e.getMessage());
                }
            }
        }
        LOG.info("Finished KafkaITCase.tupleTestTopology()");
    }

    @Test
    public void customPartitioningTestTopology() throws Exception {
        LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
        createTestTopic("customPartitioningTestTopic", 3);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.addSource(new PersistentKafkaSource(zookeeperConnectionString, "customPartitioningTestTopic", new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING)).addSink(new SinkFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.5
            int start = -1;
            BitSet validator = new BitSet(101);
            boolean gotPartition1 = false;
            boolean gotPartition2 = false;
            boolean gotPartition3 = false;

            public void invoke(Tuple2<Long, String> tuple2) throws Exception {
                KafkaITCase.LOG.info("Got " + tuple2);
                int parseInt = Integer.parseInt(((String) tuple2.f1).split("-")[1]);
                Assert.assertEquals(((Long) tuple2.f0).longValue() - 1000, parseInt);
                switch (parseInt) {
                    case 9:
                        this.gotPartition1 = true;
                        break;
                    case 19:
                        this.gotPartition2 = true;
                        break;
                    case 99:
                        this.gotPartition3 = true;
                        break;
                }
                if (this.start == -1) {
                    this.start = parseInt;
                }
                Assert.assertFalse("Received tuple twice", this.validator.get(parseInt - this.start));
                this.validator.set(parseInt - this.start);
                if (this.gotPartition1 && this.gotPartition2 && this.gotPartition3) {
                    int nextClearBit = this.validator.nextClearBit(0);
                    if (nextClearBit == 100) {
                        throw new SuccessException();
                    }
                    throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nextClearBit + " Set: " + this.validator);
                }
            }
        });
        createLocalEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.6
            private static final long serialVersionUID = 1;
            boolean running = true;

            public void run(Collector<Tuple2<Long, String>> collector) throws Exception {
                KafkaITCase.LOG.info("Starting source.");
                int i = 0;
                while (this.running) {
                    Long valueOf = Long.valueOf(1000 + i);
                    int i2 = i;
                    i++;
                    collector.collect(new Tuple2(valueOf, "kafka-" + i2));
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
            }

            public void cancel() {
                KafkaITCase.LOG.info("Source got cancel()");
                this.running = false;
            }
        }).addSink(new KafkaSink(zookeeperConnectionString, "customPartitioningTestTopic", new TupleSerializationSchema(), new CustomPartitioner()));
        try {
            createLocalEnvironment.setParallelism(1);
            createLocalEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            int i = 0;
            while (!(cause instanceof SuccessException)) {
                cause = cause.getCause();
                int i2 = i;
                i++;
                if (i2 == 20) {
                    throw e;
                }
            }
            Assert.assertTrue(partitionerHasBeenCalled);
        }
        LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
    }

    @Test
    public void simpleTestTopology() throws Exception {
        createTestTopic("simpleTestTopic", 1);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.addSource(new PersistentKafkaSource(zookeeperConnectionString, "simpleTestTopic", new JavaDefaultStringSchema(), 5000, 100, Offset.FROM_BEGINNING)).addSink(new SinkFunction<String>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.7
            int elCnt = 0;
            int start = -1;
            BitSet validator = new BitSet(101);

            public void invoke(String str) throws Exception {
                KafkaITCase.LOG.info("Got " + str);
                int parseInt = Integer.parseInt(str.split("-")[1]);
                if (this.start == -1) {
                    this.start = parseInt;
                }
                Assert.assertFalse("Received tuple twice", this.validator.get(parseInt - this.start));
                this.validator.set(parseInt - this.start);
                this.elCnt++;
                if (this.elCnt == 100) {
                    int nextClearBit = this.validator.nextClearBit(0);
                    if (nextClearBit == 100) {
                        throw new SuccessException();
                    }
                    throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nextClearBit + " Set: " + this.validator);
                }
            }
        });
        createLocalEnvironment.addSource(new SourceFunction<String>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaITCase.8
            private static final long serialVersionUID = 1;
            boolean running = true;

            public void run(Collector<String> collector) throws Exception {
                KafkaITCase.LOG.info("Starting source.");
                int i = 0;
                while (this.running) {
                    int i2 = i;
                    i++;
                    collector.collect("kafka-" + i2);
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
            }

            public void cancel() {
                KafkaITCase.LOG.info("Source got cancel()");
                this.running = false;
            }
        }).addSink(new KafkaSink(zookeeperConnectionString, "simpleTestTopic", new JavaDefaultStringSchema()));
        try {
            createLocalEnvironment.setParallelism(1);
            createLocalEnvironment.execute();
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            int i = 0;
            while (!(cause instanceof SuccessException)) {
                cause = cause.getCause();
                int i2 = i;
                i++;
                if (i2 == 20) {
                    LOG.warn("Test failed with exception", e);
                    Assert.fail("Test failed with: " + e.getMessage());
                }
            }
        }
    }

    private void createTestTopic(String str, int i) {
        new KafkaTopicUtils(zookeeperConnectionString).createTopic(str, i, 1);
    }

    private static TestingServer getZookeeper() throws Exception {
        return new TestingServer(zkPort, tmpZkDir);
    }

    private static KafkaServer getKafkaServer(int i) throws UnknownHostException {
        Properties properties = new Properties();
        properties.put("advertised.host.name", kafkaHost);
        properties.put("port", Integer.toString(kafkaPort));
        properties.put("broker.id", Integer.toString(i));
        properties.put("log.dir", tmpKafkaDir.toString());
        properties.put("zookeeper.connect", zookeeperConnectionString);
        KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(properties), new LocalSystemTime());
        kafkaServer.startup();
        return kafkaServer;
    }
}
