/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import kafka.common.NotLeaderForPartitionException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.server.KafkaServer;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;

public abstract class KafkaConsumerTestBase
extends KafkaTestBase {
    @Rule
    public RetryRule retryRule = new RetryRule();

    @Before
    public void ensureNoJobIsLingering() throws Exception {
        JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
    }

    public void runFailOnNoBrokerTest() throws Exception {
        try {
            Properties properties = new Properties();
            StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
            see.getConfig().disableSysoutLogging();
            see.setRestartStrategy(RestartStrategies.noRestart());
            see.setParallelism(1);
            properties.setProperty("bootstrap.servers", "localhost:80");
            properties.setProperty("zookeeper.connect", "localhost:80");
            properties.setProperty("group.id", "test");
            properties.setProperty("request.timeout.ms", "3000");
            properties.setProperty("socket.timeout.ms", "3000");
            properties.setProperty("session.timeout.ms", "2000");
            properties.setProperty("fetch.max.wait.ms", "2000");
            properties.setProperty("heartbeat.interval.ms", "1000");
            properties.putAll((Map<?, ?>)secureProps);
            FlinkKafkaConsumerBase source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
            DataStreamSource stream = see.addSource(source);
            stream.print();
            see.execute("No broker test");
        }
        catch (JobExecutionException jee) {
            if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
                Assert.assertTrue((boolean)(jee.getCause() instanceof TimeoutException));
                TimeoutException te = (TimeoutException)jee.getCause();
                Assert.assertEquals((Object)"Timeout expired while fetching topic metadata", (Object)te.getMessage());
            }
            Assert.assertTrue((boolean)(jee.getCause() instanceof RuntimeException));
            RuntimeException re = (RuntimeException)jee.getCause();
            Assert.assertTrue((boolean)re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
        }
    }

    public void runCommitOffsetsToKafka() throws Exception {
        int parallelism = 3;
        int recordsInEachPartition = 50;
        String topicName = this.writeSequence("testCommitOffsetsToKafkaTopic", 50, 3, 1);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.setParallelism(3);
        env.enableCheckpointing(200L);
        DataStreamSource stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps));
        stream.addSink((SinkFunction)new DiscardingSink());
        final AtomicReference errorRef = new AtomicReference();
        Thread runner = new Thread("runner"){

            @Override
            public void run() {
                block2: {
                    try {
                        env.execute();
                    }
                    catch (Throwable t) {
                        if (t.getCause() instanceof JobCancellationException) break block2;
                        errorRef.set(t);
                    }
                }
            }
        };
        runner.start();
        Long l50 = 50L;
        long deadline = 30000000000L + System.nanoTime();
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        do {
            Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
            Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
            Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
            if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) break;
            Thread.sleep(100L);
        } while (System.nanoTime() < deadline);
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        Throwable t = (Throwable)errorRef.get();
        if (t != null) {
            throw new RuntimeException("Job failed with an exception", t);
        }
        Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
        Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
        Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
        Assert.assertEquals((Object)50L, (Object)o1);
        Assert.assertEquals((Object)50L, (Object)o2);
        Assert.assertEquals((Object)50L, (Object)o3);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    public void runStartFromKafkaCommitOffsets() throws Exception {
        Long o3;
        Long o2;
        Long o1;
        int parallelism = 3;
        int recordsInEachPartition = 300;
        int recordsToConsume = 150;
        int consumePause = 50;
        String topicName = this.writeSequence("testStartFromKafkaCommitOffsetsTopic", 300, 3, 1);
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        int attempt = 0;
        do {
            LOG.info("Attempt " + ++attempt + " to read records and commit some offsets to Kafka");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().disableSysoutLogging();
            env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            env.setParallelism(3);
            env.enableCheckpointing(20L);
            env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)).map(new ThrottledMapper(50)).map((MapFunction)new MapFunction<String, Object>(){
                int count = 0;

                public Object map(String value) throws Exception {
                    ++this.count;
                    if (this.count == 150) {
                        throw new SuccessException();
                    }
                    return null;
                }
            }).addSink((SinkFunction)new DiscardingSink());
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Read some records to commit offsets to Kafka");
            o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
            o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
            o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
        } while (o1 == null && o2 == null && o3 == null && attempt < 3);
        if (o1 == null && o2 == null && o3 == null) {
            throw new RuntimeException("No offsets have been committed after 3 attempts");
        }
        LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", new Object[]{o1, o2, o3});
        StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
        env2.getConfig().disableSysoutLogging();
        env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env2.setParallelism(3);
        HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<Integer, Tuple2<Integer, Integer>>();
        partitionsToValuesCountAndStartOffset.put(0, new Tuple2((Object)(o1 != null ? (int)(300L - o1) : 300), (Object)(o1 != null ? o1.intValue() : 0)));
        partitionsToValuesCountAndStartOffset.put(1, new Tuple2((Object)(o2 != null ? (int)(300L - o2) : 300), (Object)(o2 != null ? o2.intValue() : 0)));
        partitionsToValuesCountAndStartOffset.put(2, new Tuple2((Object)(o3 != null ? (int)(300L - o3) : 300), (Object)(o3 != null ? o3.intValue() : 0)));
        this.readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
        int parallelism = 3;
        int recordsInEachPartition = 50;
        String topicName = this.writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", 50, 3, 1);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.setParallelism(3);
        env.enableCheckpointing(200L);
        Properties readProps = new Properties();
        readProps.putAll((Map<?, ?>)standardProps);
        readProps.setProperty("auto.offset.reset", "latest");
        DataStreamSource stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
        stream.addSink((SinkFunction)new DiscardingSink());
        final AtomicReference errorRef = new AtomicReference();
        Thread runner = new Thread("runner"){

            @Override
            public void run() {
                block2: {
                    try {
                        env.execute();
                    }
                    catch (Throwable t) {
                        if (t.getCause() instanceof JobCancellationException) break block2;
                        errorRef.set(t);
                    }
                }
            }
        };
        runner.start();
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        Long l50 = 50L;
        long deadline = 30000000000L + System.nanoTime();
        do {
            Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
            Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
            Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
            if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) break;
            Thread.sleep(100L);
        } while (System.nanoTime() < deadline);
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        Throwable t = (Throwable)errorRef.get();
        if (t != null) {
            throw new RuntimeException("Job failed with an exception", t);
        }
        Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
        Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
        Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
        Assert.assertEquals((Object)50L, (Object)o1);
        Assert.assertEquals((Object)50L, (Object)o2);
        Assert.assertEquals((Object)50L, (Object)o3);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    public void runStartFromEarliestOffsets() throws Exception {
        int parallelism = 3;
        int recordsInEachPartition = 50;
        String topicName = this.writeSequence("testStartFromEarliestOffsetsTopic", 50, 3, 1);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.setParallelism(3);
        Properties readProps = new Properties();
        readProps.putAll((Map<?, ?>)standardProps);
        readProps.setProperty("auto.offset.reset", "latest");
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43L);
        this.readSequence(env, StartupMode.EARLIEST, null, readProps, 3, topicName, 50, 0);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    public void runStartFromLatestOffsets() throws Exception {
        int parallelism = 3;
        int recordsInEachPartition = 50;
        int extraRecordsInEachPartition = 200;
        String topicName = this.writeSequence("testStartFromLatestOffsetsTopic", 50, 3, 1);
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43L);
        String consumeExtraRecordsJobName = "Consume Extra Records Job";
        String writeExtraRecordsJobName = "Write Extra Records Job";
        TypeInformation resultType = TypeInformation.of((TypeHint)new TypeHint<Tuple2<Integer, Integer>>(){});
        KeyedSerializationSchemaWrapper serSchema = new KeyedSerializationSchemaWrapper((SerializationSchema)new TypeInformationSerializationSchema(resultType, new ExecutionConfig()));
        KeyedDeserializationSchemaWrapper deserSchema = new KeyedDeserializationSchemaWrapper((DeserializationSchema)new TypeInformationSerializationSchema(resultType, new ExecutionConfig()));
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.setParallelism(3);
        Properties readProps = new Properties();
        readProps.putAll((Map<?, ?>)standardProps);
        readProps.setProperty("auto.offset.reset", "earliest");
        FlinkKafkaConsumerBase latestReadingConsumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
        latestReadingConsumer.setStartFromLatest();
        env.addSource(latestReadingConsumer).setParallelism(3).flatMap((FlatMapFunction)new FlatMapFunction<Tuple2<Integer, Integer>, Object>(){

            public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
                if ((Integer)value.f1 - 50 < 0) {
                    throw new RuntimeException("test failed; consumed a record that was previously written: " + value);
                }
            }
        }).setParallelism(1).addSink((SinkFunction)new DiscardingSink());
        final AtomicReference error = new AtomicReference();
        Thread consumeThread = new Thread(new Runnable(){

            @Override
            public void run() {
                block2: {
                    try {
                        env.execute("Consume Extra Records Job");
                    }
                    catch (Throwable t) {
                        if (t instanceof JobCancellationException) break block2;
                        error.set(t);
                    }
                }
            }
        });
        consumeThread.start();
        JobManagerCommunicationUtils.waitUntilJobIsRunning(flink.getLeaderGateway(timeout), "Consume Extra Records Job");
        StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
        env2.setParallelism(3);
        DataStreamSource extraRecordsStream = env2.addSource((SourceFunction)new RichParallelSourceFunction<Tuple2<Integer, Integer>>(){
            private boolean running = true;

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
                int partition = this.getRuntimeContext().getIndexOfThisSubtask();
                for (int count = 50; this.running && count < 250; ++count) {
                    ctx.collect((Object)new Tuple2((Object)partition, (Object)count));
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        kafkaServer.produceIntoKafka(extraRecordsStream, topicName, serSchema, readProps, null);
        try {
            env2.execute("Write Extra Records Job");
        }
        catch (Exception e) {
            throw new RuntimeException("Writing extra records failed", e);
        }
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Consume Extra Records Job");
        consumeThread.join();
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
        Throwable consumerError = (Throwable)error.get();
        if (consumerError != null) {
            throw new Exception("Exception in the consuming thread", consumerError);
        }
    }

    public void runStartFromGroupOffsets() throws Exception {
        int parallelism = 3;
        int recordsInEachPartition = 50;
        String topicName = this.writeSequence("testStartFromGroupOffsetsTopic", 50, 3, 1);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.setParallelism(3);
        Properties readProps = new Properties();
        readProps.putAll((Map<?, ?>)standardProps);
        readProps.setProperty("auto.offset.reset", "earliest");
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43L);
        HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<Integer, Tuple2<Integer, Integer>>();
        partitionsToValueCountAndStartOffsets.put(0, new Tuple2((Object)27, (Object)23));
        partitionsToValueCountAndStartOffsets.put(1, new Tuple2((Object)50, (Object)0));
        partitionsToValueCountAndStartOffsets.put(2, new Tuple2((Object)7, (Object)43));
        this.readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    public void runStartFromSpecificOffsets() throws Exception {
        int parallelism = 4;
        int recordsInEachPartition = 50;
        String topicName = this.writeSequence("testStartFromSpecificOffsetsTopic", 50, 4, 1);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.setParallelism(4);
        Properties readProps = new Properties();
        readProps.putAll((Map<?, ?>)standardProps);
        readProps.setProperty("auto.offset.reset", "earliest");
        HashMap<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<KafkaTopicPartition, Long>();
        specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L);
        specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L);
        specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L);
        KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
        kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31L);
        kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43L);
        HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<Integer, Tuple2<Integer, Integer>>();
        partitionsToValueCountAndStartOffsets.put(0, new Tuple2((Object)31, (Object)19));
        partitionsToValueCountAndStartOffsets.put(1, new Tuple2((Object)19, (Object)31));
        partitionsToValueCountAndStartOffsets.put(2, new Tuple2((Object)28, (Object)22));
        partitionsToValueCountAndStartOffsets.put(3, new Tuple2((Object)50, (Object)0));
        this.readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets);
        kafkaOffsetHandler.close();
        KafkaConsumerTestBase.deleteTestTopic(topicName);
    }

    @RetryOnException(times=2, exception=NotLeaderForPartitionException.class)
    public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
        String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
        String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
        int parallelism = 3;
        int elementsPerPartition = 100;
        int totalElements = 300;
        KafkaConsumerTestBase.createTestTopic(topic, 3, 2);
        KafkaConsumerTestBase.createTestTopic(additionalEmptyTopic, 3, 1);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing(500L);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        TypeInformation longStringType = TypeInfoParser.parse((String)"Tuple2<Long, String>");
        TypeInformationSerializationSchema sourceSchema = new TypeInformationSerializationSchema(longStringType, env.getConfig());
        TypeInformationSerializationSchema sinkSchema = new TypeInformationSerializationSchema(longStringType, env.getConfig());
        DataStreamSource stream = env.addSource((SourceFunction)new RichParallelSourceFunction<Tuple2<Long, String>>(){
            private boolean running = true;

            public void run(SourceFunction.SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException {
                int cnt;
                int limit = cnt + 100;
                for (cnt = this.getRuntimeContext().getIndexOfThisSubtask() * 100; this.running && cnt < limit; ++cnt) {
                    ctx.collect((Object)new Tuple2((Object)(1000L + (long)cnt), (Object)("kafka-" + cnt)));
                    Thread.sleep(50L);
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerConnectionStrings);
        producerProperties.setProperty("retries", "3");
        producerProperties.putAll((Map<?, ?>)secureProps);
        kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper((SerializationSchema)sinkSchema), producerProperties, null);
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(topic);
        topics.add(additionalEmptyTopic);
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase source = kafkaServer.getConsumer((List<String>)topics, sourceSchema, props);
        DataStreamSource consuming = env.addSource(source).setParallelism(3);
        consuming.addSink((SinkFunction)new RichSinkFunction<Tuple2<Long, String>>(){
            private int elCnt = 0;
            private BitSet validator = new BitSet(300);

            public void invoke(Tuple2<Long, String> value) throws Exception {
                String[] sp = ((String)value.f1).split("-");
                int v = Integer.parseInt(sp[1]);
                Assert.assertEquals((long)((Long)value.f0 - 1000L), (long)v);
                Assert.assertFalse((String)"Received tuple twice", (boolean)this.validator.get(v));
                this.validator.set(v);
                ++this.elCnt;
                if (this.elCnt == 300) {
                    int nc = this.validator.nextClearBit(0);
                    if (nc != 300) {
                        Assert.fail((String)("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + this.validator));
                    }
                    throw new SuccessException();
                }
            }

            public void close() throws Exception {
                super.close();
            }
        }).setParallelism(1);
        try {
            KafkaConsumerTestBase.tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
        }
        catch (ProgramInvocationException | JobExecutionException e) {
            int depth = 0;
            for (Throwable cause = e.getCause(); cause != null && depth++ < 20; cause = cause.getCause()) {
                if (!(cause instanceof NotLeaderForPartitionException)) continue;
                throw (Exception)cause;
            }
            throw e;
        }
        KafkaConsumerTestBase.deleteTestTopic(topic);
    }

    public void runOneToOneExactlyOnceTest() throws Exception {
        String topic = "oneToOneTopic";
        int parallelism = 5;
        int numElementsPerPartition = 1000;
        int totalElements = 5000;
        int failAfterElements = 333;
        KafkaConsumerTestBase.createTestTopic("oneToOneTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "oneToOneTopic", 5, 1000, true);
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setParallelism(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        env.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase kafkaSource = kafkaServer.getConsumer("oneToOneTopic", schema, props);
        env.addSource(kafkaSource).map((MapFunction)new PartitionValidatingMapper(5, 1)).map(new FailingIdentityMapper(333)).addSink((SinkFunction)new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"One-to-one exactly once test");
        KafkaConsumerTestBase.deleteTestTopic("oneToOneTopic");
    }

    public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
        String topic = "oneToManyTopic";
        int numPartitions = 5;
        int numElementsPerPartition = 1000;
        int totalElements = 5000;
        int failAfterElements = 333;
        int parallelism = 2;
        KafkaConsumerTestBase.createTestTopic("oneToManyTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "oneToManyTopic", 5, 1000, false);
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setParallelism(2);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        env.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase kafkaSource = kafkaServer.getConsumer("oneToManyTopic", schema, props);
        env.addSource(kafkaSource).map((MapFunction)new PartitionValidatingMapper(5, 3)).map(new FailingIdentityMapper(333)).addSink((SinkFunction)new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"One-source-multi-partitions exactly once test");
        KafkaConsumerTestBase.deleteTestTopic("oneToManyTopic");
    }

    public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
        String topic = "manyToOneTopic";
        int numPartitions = 5;
        int numElementsPerPartition = 1000;
        int totalElements = 5000;
        int failAfterElements = 333;
        int parallelism = 8;
        KafkaConsumerTestBase.createTestTopic("manyToOneTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "manyToOneTopic", 5, 1000, true);
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setParallelism(8);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        env.getConfig().disableSysoutLogging();
        env.setBufferTimeout(0L);
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase kafkaSource = kafkaServer.getConsumer("manyToOneTopic", schema, props);
        env.addSource(kafkaSource).map((MapFunction)new PartitionValidatingMapper(5, 1)).map(new FailingIdentityMapper(333)).addSink((SinkFunction)new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"multi-source-one-partitions exactly once test");
        KafkaConsumerTestBase.deleteTestTopic("manyToOneTopic");
    }

    public void runCancelingOnFullInputTest() throws Exception {
        String topic = "cancelingOnFullTopic";
        int parallelism = 3;
        KafkaConsumerTestBase.createTestTopic("cancelingOnFullTopic", 3, 1);
        DataGenerators.InfiniteStringsGenerator generator = new DataGenerators.InfiniteStringsGenerator(kafkaServer, "cancelingOnFullTopic");
        generator.start();
        final AtomicReference jobError = new AtomicReference();
        Runnable jobRunner = new Runnable(){

            @Override
            public void run() {
                try {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(3);
                    env.enableCheckpointing(100L);
                    env.getConfig().disableSysoutLogging();
                    Properties props = new Properties();
                    props.putAll((Map<?, ?>)KafkaTestBase.standardProps);
                    props.putAll((Map<?, ?>)KafkaTestBase.secureProps);
                    FlinkKafkaConsumerBase source = KafkaTestBase.kafkaServer.getConsumer("cancelingOnFullTopic", new SimpleStringSchema(), props);
                    env.addSource(source).addSink((SinkFunction)new DiscardingSink());
                    env.execute("Runner for CancelingOnFullInputTest");
                }
                catch (Throwable t) {
                    jobError.set(t);
                }
            }
        };
        Thread runnerThread = new Thread(jobRunner, "program runner thread");
        runnerThread.start();
        Thread.sleep(2000L);
        Throwable failueCause = (Throwable)jobError.get();
        if (failueCause != null) {
            failueCause.printStackTrace();
            Assert.fail((String)("Test failed prematurely with: " + failueCause.getMessage()));
        }
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
        runnerThread.join();
        failueCause = (Throwable)jobError.get();
        Assert.assertNotNull((String)"program did not fail properly due to canceling", (Object)failueCause);
        Assert.assertTrue((boolean)failueCause.getMessage().contains("Job was cancelled"));
        if (generator.isAlive()) {
            generator.shutdown();
            generator.join();
        } else {
            Throwable t = generator.getError();
            if (t != null) {
                t.printStackTrace();
                Assert.fail((String)("Generator failed: " + t.getMessage()));
            } else {
                Assert.fail((String)"Generator failed with no exception");
            }
        }
        KafkaConsumerTestBase.deleteTestTopic("cancelingOnFullTopic");
    }

    public void runCancelingOnEmptyInputTest() throws Exception {
        String topic = "cancelingOnEmptyInputTopic";
        int parallelism = 3;
        KafkaConsumerTestBase.createTestTopic("cancelingOnEmptyInputTopic", 3, 1);
        final AtomicReference error = new AtomicReference();
        Runnable jobRunner = new Runnable(){

            @Override
            public void run() {
                try {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(3);
                    env.enableCheckpointing(100L);
                    env.getConfig().disableSysoutLogging();
                    Properties props = new Properties();
                    props.putAll((Map<?, ?>)KafkaTestBase.standardProps);
                    props.putAll((Map<?, ?>)KafkaTestBase.secureProps);
                    FlinkKafkaConsumerBase source = KafkaTestBase.kafkaServer.getConsumer("cancelingOnEmptyInputTopic", new SimpleStringSchema(), props);
                    env.addSource(source).addSink((SinkFunction)new DiscardingSink());
                    env.execute("CancelingOnEmptyInputTest");
                }
                catch (Throwable t) {
                    KafkaTestBase.LOG.error("Job Runner failed with exception", t);
                    error.set(t);
                }
            }
        };
        Thread runnerThread = new Thread(jobRunner, "program runner thread");
        runnerThread.start();
        Thread.sleep(2000L);
        Throwable failueCause = (Throwable)error.get();
        if (failueCause != null) {
            failueCause.printStackTrace();
            Assert.fail((String)("Test failed prematurely with: " + failueCause.getMessage()));
        }
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        runnerThread.join();
        failueCause = (Throwable)error.get();
        Assert.assertNotNull((String)"program did not fail properly due to canceling", (Object)failueCause);
        Assert.assertTrue((boolean)failueCause.getMessage().contains("Job was cancelled"));
        KafkaConsumerTestBase.deleteTestTopic("cancelingOnEmptyInputTopic");
    }

    public void runFailOnDeployTest() throws Exception {
        String topic = "failOnDeployTopic";
        KafkaConsumerTestBase.createTestTopic("failOnDeployTopic", 2, 1);
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(12);
        env.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase kafkaSource = kafkaServer.getConsumer("failOnDeployTopic", schema, props);
        env.addSource(kafkaSource).addSink((SinkFunction)new DiscardingSink());
        try {
            env.execute("test fail on deploy");
            Assert.fail((String)"this test should fail with an exception");
        }
        catch (JobExecutionException e) {
            int depth = 0;
            boolean foundResourceException = false;
            for (Throwable cause = e.getCause(); cause != null && depth++ < 20; cause = cause.getCause()) {
                if (!(cause instanceof NoResourceAvailableException)) continue;
                foundResourceException = true;
                break;
            }
            Assert.assertTrue((String)"Wrong exception", (boolean)foundResourceException);
        }
        KafkaConsumerTestBase.deleteTestTopic("failOnDeployTopic");
    }

    public void runProduceConsumeMultipleTopics() throws Exception {
        int NUM_TOPICS = 5;
        int NUM_ELEMENTS = 20;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        ArrayList<String> topics = new ArrayList<String>();
        for (int i = 0; i < 5; ++i) {
            String topic = "topic-" + i;
            topics.add(topic);
            KafkaConsumerTestBase.createTestTopic(topic, i + 1, 1);
        }
        env.setParallelism(1);
        DataStreamSource stream = env.addSource((SourceFunction)new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>(){

            public void run(SourceFunction.SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception {
                int partition = this.getRuntimeContext().getIndexOfThisSubtask();
                for (int topicId = 0; topicId < 5; ++topicId) {
                    for (int i = 0; i < 20; ++i) {
                        ctx.collect((Object)new Tuple3((Object)partition, (Object)i, (Object)("topic-" + topicId)));
                    }
                }
            }

            public void cancel() {
            }
        });
        Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null);
        env.execute("Write to topics");
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
        stream.flatMap((FlatMapFunction)new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>(){
            Map<String, Integer> countPerTopic = new HashMap<String, Integer>(5);

            public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
                Integer count = this.countPerTopic.get(value.f2);
                if (count == null) {
                    count = 1;
                } else {
                    Integer n = count;
                    Integer n2 = count = Integer.valueOf(count + 1);
                }
                this.countPerTopic.put((String)value.f2, count);
                for (Map.Entry<String, Integer> el : this.countPerTopic.entrySet()) {
                    if (el.getValue() < 20) break;
                    if (el.getValue() <= 20) continue;
                    throw new RuntimeException("There is a failure in the test. I've read " + el.getValue() + " from topic " + el.getKey());
                }
                throw new SuccessException();
            }
        }).setParallelism(1);
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Count elements from the topics");
        for (int i = 0; i < 5; ++i) {
            String topic = "topic-" + i;
            KafkaConsumerTestBase.deleteTestTopic(topic);
        }
    }

    public void runBigRecordTestTopology() throws Exception {
        String topic = "bigRecordTestTopic";
        boolean parallelism = true;
        KafkaConsumerTestBase.createTestTopic("bigRecordTestTopic", 1, 1);
        TypeInformation longBytesInfo = TypeInfoParser.parse((String)"Tuple2<Long, byte[]>");
        TypeInformationSerializationSchema serSchema = new TypeInformationSerializationSchema(longBytesInfo, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        env.enableCheckpointing(100L);
        env.setParallelism(1);
        Properties consumerProps = new Properties();
        consumerProps.putAll((Map<?, ?>)standardProps);
        consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(0xE00000));
        consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(0xE00000));
        consumerProps.setProperty("queued.max.message.chunks", "1");
        consumerProps.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase source = kafkaServer.getConsumer("bigRecordTestTopic", serSchema, consumerProps);
        DataStreamSource consuming = env.addSource(source);
        consuming.addSink((SinkFunction)new SinkFunction<Tuple2<Long, byte[]>>(){
            private int elCnt = 0;

            public void invoke(Tuple2<Long, byte[]> value) throws Exception {
                ++this.elCnt;
                if ((Long)value.f0 == -1L) {
                    if (this.elCnt == 11) {
                        throw new SuccessException();
                    }
                    throw new RuntimeException("There have been " + this.elCnt + " elements");
                }
                if (this.elCnt > 10) {
                    throw new RuntimeException("More than 10 elements seen: " + this.elCnt);
                }
            }
        });
        Properties producerProps = new Properties();
        producerProps.setProperty("max.request.size", Integer.toString(0xF00000));
        producerProps.setProperty("retries", "3");
        producerProps.putAll((Map<?, ?>)secureProps);
        producerProps.setProperty("bootstrap.servers", brokerConnectionStrings);
        DataStreamSource stream = env.addSource((SourceFunction)new RichSourceFunction<Tuple2<Long, byte[]>>(){
            private boolean running;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.running = true;
            }

            public void run(SourceFunction.SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
                Random rnd = new Random();
                long cnt = 0L;
                int sevenMb = 0x700000;
                while (this.running) {
                    byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)];
                    ctx.collect((Object)new Tuple2((Object)cnt++, (Object)wl));
                    Thread.sleep(100L);
                    if (cnt != 10L) continue;
                    ctx.collect((Object)new Tuple2((Object)-1L, (Object)new byte[]{1}));
                    break;
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        kafkaServer.produceIntoKafka(stream, "bigRecordTestTopic", new KeyedSerializationSchemaWrapper((SerializationSchema)serSchema), producerProps, null);
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"big topology test");
        KafkaConsumerTestBase.deleteTestTopic("bigRecordTestTopic");
    }

    public void runBrokerFailureTest() throws Exception {
        String topic = "brokerFailureTestTopic";
        int parallelism = 2;
        int numElementsPerPartition = 1000;
        int totalElements = 2000;
        int failAfterElements = 333;
        KafkaConsumerTestBase.createTestTopic("brokerFailureTestTopic", 2, 2);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "brokerFailureTestTopic", 2, 1000, true);
        int leaderId = kafkaServer.getLeaderToShutDown("brokerFailureTestTopic");
        LOG.info("Leader to shutdown {}", (Object)leaderId);
        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema((TypeInformation)BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.enableCheckpointing(500L);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase kafkaSource = kafkaServer.getConsumer("brokerFailureTestTopic", schema, props);
        env.addSource(kafkaSource).map((MapFunction)new PartitionValidatingMapper(2, 1)).map(new BrokerKillingMapper(leaderId, 333)).addSink((SinkFunction)new ValidatingExactlyOnceSink(2000)).setParallelism(1);
        BrokerKillingMapper.killedLeaderBefore = false;
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Broker failure once test");
        kafkaServer.restartBroker(leaderId);
    }

    public void runKeyValueTest() throws Exception {
        String topic = "keyvaluetest";
        KafkaConsumerTestBase.createTestTopic("keyvaluetest", 1, 1);
        int ELEMENT_COUNT = 5000;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        DataStreamSource kvStream = env.addSource((SourceFunction)new SourceFunction<Tuple2<Long, PojoValue>>(){

            public void run(SourceFunction.SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
                Random rnd = new Random(1337L);
                for (long i = 0L; i < 5000L; ++i) {
                    PojoValue pojo = new PojoValue();
                    pojo.when = new Date(rnd.nextLong());
                    pojo.lon = rnd.nextLong();
                    pojo.lat = i;
                    Long key = i % 2L == 0L ? null : Long.valueOf(i);
                    ctx.collect((Object)new Tuple2((Object)key, (Object)pojo));
                }
            }

            public void cancel() {
            }
        });
        TypeInformationKeyValueSerializationSchema schema = new TypeInformationKeyValueSerializationSchema(Long.class, PojoValue.class, env.getConfig());
        Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerConnectionStrings);
        producerProperties.setProperty("retries", "3");
        kafkaServer.produceIntoKafka(kvStream, "keyvaluetest", schema, producerProperties, null);
        env.execute("Write KV to Kafka");
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        TypeInformationKeyValueSerializationSchema readSchema = new TypeInformationKeyValueSerializationSchema(Long.class, PojoValue.class, env.getConfig());
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        DataStreamSource fromKafka = env.addSource(kafkaServer.getConsumer("keyvaluetest", readSchema, props));
        fromKafka.flatMap((FlatMapFunction)new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>(){
            long counter = 0L;

            public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
                Assert.assertTrue((String)("Wrong value " + ((PojoValue)value.f1).lat), (((PojoValue)value.f1).lat == this.counter ? 1 : 0) != 0);
                if (((PojoValue)value.f1).lat % 2L == 0L) {
                    Assert.assertNull((String)"key was not null", (Object)value.f0);
                } else {
                    Assert.assertTrue((String)("Wrong value " + value.f0), ((Long)value.f0 == this.counter ? 1 : 0) != 0);
                }
                ++this.counter;
                if (this.counter == 5000L) {
                    throw new SuccessException();
                }
            }
        });
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Read KV from Kafka");
        KafkaConsumerTestBase.deleteTestTopic("keyvaluetest");
    }

    public void runAllDeletesTest() throws Exception {
        String topic = "alldeletestest";
        KafkaConsumerTestBase.createTestTopic("alldeletestest", 1, 1);
        int ELEMENT_COUNT = 300;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        DataStreamSource kvStream = env.addSource((SourceFunction)new SourceFunction<Tuple2<byte[], PojoValue>>(){

            public void run(SourceFunction.SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
                Random rnd = new Random(1337L);
                for (long i = 0L; i < 300L; ++i) {
                    byte[] key = new byte[200];
                    rnd.nextBytes(key);
                    ctx.collect((Object)new Tuple2((Object)key, (Object)null));
                }
            }

            public void cancel() {
            }
        });
        TypeInformationKeyValueSerializationSchema schema = new TypeInformationKeyValueSerializationSchema(byte[].class, PojoValue.class, env.getConfig());
        Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerConnectionStrings);
        producerProperties.setProperty("retries", "3");
        producerProperties.putAll((Map<?, ?>)secureProps);
        kafkaServer.produceIntoKafka(kvStream, "alldeletestest", schema, producerProperties, null);
        env.execute("Write deletes to Kafka");
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        DataStreamSource fromKafka = env.addSource(kafkaServer.getConsumer("alldeletestest", schema, props));
        fromKafka.flatMap((FlatMapFunction)new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>(){
            long counter = 0L;

            public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception {
                Assert.assertNull((Object)value.f1);
                ++this.counter;
                if (this.counter == 300L) {
                    throw new SuccessException();
                }
            }
        });
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Read deletes from Kafka");
        KafkaConsumerTestBase.deleteTestTopic("alldeletestest");
    }

    public void runEndOfStreamTest() throws Exception {
        int ELEMENT_COUNT = 300;
        String topic = this.writeSequence("testEndOfStream", 300, 1, 1);
        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
        env1.setParallelism(1);
        env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        env1.getConfig().disableSysoutLogging();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        DataStreamSource fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(300), props));
        fromKafka.flatMap((FlatMapFunction)new FlatMapFunction<Tuple2<Integer, Integer>, Void>(){

            public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
            }
        });
        JobExecutionResult result = TestUtils.tryExecute((StreamExecutionEnvironment)env1, (String)"Consume 300 elements from Kafka");
        KafkaConsumerTestBase.deleteTestTopic(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runMetricsTest() throws Throwable {
        String topic = "metricsStream";
        KafkaConsumerTestBase.createTestTopic("metricsStream", 5, 1);
        final Tuple1 error = new Tuple1(null);
        Runnable job = new Runnable(){

            @Override
            public void run() {
                block2: {
                    try {
                        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
                        env1.setParallelism(1);
                        env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                        env1.getConfig().disableSysoutLogging();
                        env1.disableOperatorChaining();
                        Properties props = new Properties();
                        props.putAll((Map<?, ?>)KafkaTestBase.standardProps);
                        props.putAll((Map<?, ?>)KafkaTestBase.secureProps);
                        TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(TypeInfoParser.parse((String)"Tuple2<Integer, Integer>"), env1.getConfig());
                        DataStreamSource fromKafka = env1.addSource(KafkaTestBase.kafkaServer.getConsumer("metricsStream", schema, KafkaTestBase.standardProps));
                        fromKafka.flatMap((FlatMapFunction)new FlatMapFunction<Tuple2<Integer, Integer>, Void>(){

                            public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
                            }
                        });
                        DataStreamSource fromGen = env1.addSource((SourceFunction)new RichSourceFunction<Tuple2<Integer, Integer>>(){
                            boolean running = true;

                            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
                                int i = 0;
                                while (this.running) {
                                    ctx.collect((Object)Tuple2.of((Object)i++, (Object)this.getRuntimeContext().getIndexOfThisSubtask()));
                                    Thread.sleep(1L);
                                }
                            }

                            public void cancel() {
                                this.running = false;
                            }
                        });
                        KafkaTestBase.kafkaServer.produceIntoKafka(fromGen, "metricsStream", new KeyedSerializationSchemaWrapper((SerializationSchema)schema), KafkaTestBase.standardProps, null);
                        env1.execute("Metrics test job");
                    }
                    catch (Throwable t) {
                        KafkaTestBase.LOG.warn("Got exception during execution", t);
                        if (t instanceof JobCancellationException) break block2;
                        error.f0 = t;
                    }
                }
            }
        };
        Thread jobThread = new Thread(job);
        jobThread.start();
        try {
            MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
            Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
            while (offsetMetrics.size() < 5) {
                if (error.f0 != null) {
                    throw (Throwable)error.f0;
                }
                offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
                Thread.sleep(50L);
            }
            Assert.assertEquals((long)5L, (long)offsetMetrics.size());
            while (true) {
                int numPosOffsets = 0;
                for (ObjectName object : offsetMetrics) {
                    Object offset = mBeanServer.getAttribute(object, "Value");
                    if ((Long)offset < 0L) continue;
                    ++numPosOffsets;
                }
                if (numPosOffsets == 5) break;
                Thread.sleep(50L);
            }
            Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
            Assert.assertTrue((String)"No producer metrics found", (producerMetrics.size() > 30 ? 1 : 0) != 0);
            LOG.info("Found all JMX metrics. Cancelling job.");
        }
        finally {
            JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        }
        while (jobThread.isAlive()) {
            Thread.sleep(50L);
        }
        if (error.f0 != null) {
            throw (Throwable)error.f0;
        }
        KafkaConsumerTestBase.deleteTestTopic("metricsStream");
    }

    protected void readSequence(StreamExecutionEnvironment env, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, Properties cc, String topicName, final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
        int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size();
        int finalCountTmp = 0;
        for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) {
            finalCountTmp += ((Integer)valuesCountAndStartOffset.getValue().f0).intValue();
        }
        final int finalCount = finalCountTmp;
        TypeInformation intIntTupleType = TypeInfoParser.parse((String)"Tuple2<Integer, Integer>");
        TypeInformationSerializationSchema deser = new TypeInformationSerializationSchema(intIntTupleType, env.getConfig());
        cc.putAll((Map<?, ?>)secureProps);
        FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer(topicName, deser, cc);
        switch (startupMode) {
            case EARLIEST: {
                consumer.setStartFromEarliest();
                break;
            }
            case LATEST: {
                consumer.setStartFromLatest();
                break;
            }
            case SPECIFIC_OFFSETS: {
                consumer.setStartFromSpecificOffsets(specificStartupOffsets);
                break;
            }
            case GROUP_OFFSETS: {
                consumer.setStartFromGroupOffsets();
            }
        }
        SingleOutputStreamOperator source = env.addSource(consumer).setParallelism(sourceParallelism).map(new ThrottledMapper(20)).setParallelism(sourceParallelism);
        source.flatMap((FlatMapFunction)new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>(){
            private HashMap<Integer, BitSet> partitionsToValueCheck;
            private int count = 0;

            public void open(Configuration parameters) throws Exception {
                this.partitionsToValueCheck = new HashMap();
                for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) {
                    this.partitionsToValueCheck.put(partition, new BitSet());
                }
            }

            public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
                int partition = (Integer)value.f0;
                int val = (Integer)value.f1;
                BitSet bitSet = this.partitionsToValueCheck.get(partition);
                if (bitSet == null) {
                    throw new RuntimeException("Got a record from an unknown partition");
                }
                bitSet.set(val - (Integer)((Tuple2)partitionsToValuesCountAndStartOffset.get((Object)Integer.valueOf((int)partition))).f1);
                ++this.count;
                KafkaTestBase.LOG.info("Received message {}, total {} messages", value, (Object)this.count);
                if (this.count == finalCount) {
                    for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) {
                        BitSet check = partitionsToValueCheck.getValue();
                        int expectedValueCount = (Integer)((Tuple2)partitionsToValuesCountAndStartOffset.get((Object)partitionsToValueCheck.getKey())).f0;
                        if (check.cardinality() != expectedValueCount) {
                            throw new RuntimeException("Expected cardinality to be " + expectedValueCount + ", but was " + check.cardinality());
                        }
                        if (check.nextClearBit(0) == expectedValueCount) continue;
                        throw new RuntimeException("Expected next clear bit to be " + expectedValueCount + ", but was " + check.cardinality());
                    }
                    throw new SuccessException();
                }
            }
        }).setParallelism(1);
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"Read data from Kafka");
        LOG.info("Successfully read sequence for verification");
    }

    protected void readSequence(StreamExecutionEnvironment env, StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, Properties cc, int sourceParallelism, String topicName, int valuesCount, int startFrom) throws Exception {
        HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<Integer, Tuple2<Integer, Integer>>();
        for (int i = 0; i < sourceParallelism; ++i) {
            partitionsToValuesCountAndStartOffset.put(i, (Tuple2<Integer, Integer>)new Tuple2((Object)valuesCount, (Object)startFrom));
        }
        this.readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset);
    }

    protected String writeSequence(String baseTopicName, final int numElements, final int parallelism, int replicationFactor) throws Exception {
        LOG.info("\n===================================\n== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" + "===================================");
        TypeInformation resultType = TypeInformation.of((TypeHint)new TypeHint<Tuple2<Integer, Integer>>(){});
        KeyedSerializationSchemaWrapper serSchema = new KeyedSerializationSchemaWrapper((SerializationSchema)new TypeInformationSerializationSchema(resultType, new ExecutionConfig()));
        KeyedDeserializationSchemaWrapper deserSchema = new KeyedDeserializationSchemaWrapper((DeserializationSchema)new TypeInformationSerializationSchema(resultType, new ExecutionConfig()));
        int maxNumAttempts = 10;
        for (int attempt = 1; attempt <= 10; ++attempt) {
            boolean success;
            long delay;
            String topicName = baseTopicName + '-' + attempt;
            LOG.info("Writing attempt #1");
            KafkaConsumerTestBase.createTestTopic(topicName, parallelism, replicationFactor);
            StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            writeEnv.getConfig().disableSysoutLogging();
            DataStreamSource stream = writeEnv.addSource((SourceFunction)new RichParallelSourceFunction<Tuple2<Integer, Integer>>(){
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
                    int partition = this.getRuntimeContext().getIndexOfThisSubtask();
                    for (int cnt = 0; this.running && cnt < numElements; ++cnt) {
                        ctx.collect((Object)new Tuple2((Object)partition, (Object)cnt));
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(parallelism);
            Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerConnectionStrings);
            producerProperties.setProperty("retries", "0");
            producerProperties.putAll((Map<?, ?>)secureProps);
            kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism)).setParallelism(parallelism);
            try {
                writeEnv.execute("Write sequence");
            }
            catch (Exception e) {
                LOG.error("Write attempt failed, trying again", (Throwable)e);
                KafkaConsumerTestBase.deleteTestTopic(topicName);
                JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
                continue;
            }
            LOG.info("Finished writing sequence");
            LOG.info("Validating sequence");
            JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
            final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            readEnv.getConfig().disableSysoutLogging();
            readEnv.setParallelism(parallelism);
            Properties readProps = (Properties)standardProps.clone();
            readProps.setProperty("group.id", "flink-tests-validator");
            readProps.putAll((Map<?, ?>)secureProps);
            FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
            readEnv.addSource(consumer).map((MapFunction)new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){
                private final int totalCount;
                private int count;
                {
                    this.totalCount = parallelism * numElements;
                    this.count = 0;
                }

                public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
                    if (++this.count == this.totalCount) {
                        throw new SuccessException();
                    }
                    return value;
                }
            }).setParallelism(1).addSink((SinkFunction)new DiscardingSink()).setParallelism(1);
            final AtomicReference errorRef = new AtomicReference();
            Thread runner = new Thread(){

                @Override
                public void run() {
                    try {
                        TestUtils.tryExecute((StreamExecutionEnvironment)readEnv, (String)"sequence validation");
                    }
                    catch (Throwable t) {
                        errorRef.set(t);
                    }
                }
            };
            runner.start();
            long deadline = System.nanoTime() + 10000000000L;
            while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0L) {
                runner.join(delay / 1000000L);
            }
            if (runner.isAlive()) {
                success = false;
                JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
            } else {
                Throwable error = (Throwable)errorRef.get();
                if (error != null) {
                    success = false;
                    LOG.info("Attempt " + attempt + " failed with exception", error);
                } else {
                    success = true;
                }
            }
            JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
            if (success) {
                return topicName;
            }
            KafkaConsumerTestBase.deleteTestTopic(topicName);
        }
        throw new Exception("Could not write a valid sequence to Kafka after 10 attempts");
    }

    private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, int stopAfter) {
        Map<String, Integer> topicCountMap;
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector((ConsumerConfig)config);
        Map streams = consumerConnector.createMessageStreams(topicCountMap = Collections.singletonMap(topicName, 1));
        if (streams.size() != 1) {
            throw new RuntimeException("Expected only one message stream but got " + streams.size());
        }
        List kafkaStreams = (List)streams.get(topicName);
        if (kafkaStreams == null) {
            throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
        }
        if (kafkaStreams.size() != 1) {
            throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
        }
        LOG.info("Opening Consumer instance for topic '{}' on group '{}'", (Object)topicName, (Object)config.groupId());
        ConsumerIterator iteratorToRead = ((KafkaStream)kafkaStreams.get(0)).iterator();
        ArrayList<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
        int read = 0;
        while (iteratorToRead.hasNext()) {
            result.add((MessageAndMetadata<byte[], byte[]>)iteratorToRead.next());
            if (++read != stopAfter) continue;
            LOG.info("Read " + read + " elements");
            return result;
        }
        return result;
    }

    private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema<?> deserializationSchema, int stopAfter) throws IOException {
        List<MessageAndMetadata<byte[], byte[]>> contents = KafkaConsumerTestBase.readTopicToList(topicName, config, stopAfter);
        LOG.info("Printing contents of topic {} in consumer grouo {}", (Object)topicName, (Object)config.groupId());
        for (MessageAndMetadata<byte[], byte[]> message : contents) {
            Object out = deserializationSchema.deserialize((byte[])message.message());
            LOG.info("Message: partition: {} offset: {} msg: {}", new Object[]{message.partition(), message.offset(), out.toString()});
        }
    }

    private static void printTopic(String topicName, int elements, DeserializationSchema<?> deserializer) throws IOException {
        Properties newProps = new Properties(standardProps);
        newProps.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
        newProps.setProperty("auto.offset.reset", "smallest");
        newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
        newProps.putAll((Map<?, ?>)secureProps);
        ConsumerConfig printerConfig = new ConsumerConfig(newProps);
        KafkaConsumerTestBase.printTopic(topicName, printerConfig, deserializer, elements);
    }

    private static class Tuple2WithTopicSchema
    implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
    KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
        private final TypeSerializer<Tuple2<Integer, Integer>> ts;

        public Tuple2WithTopicSchema(ExecutionConfig ec) {
            this.ts = TypeInfoParser.parse((String)"Tuple2<Integer, Integer>").createSerializer(ec);
        }

        public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(message));
            Tuple2 t2 = (Tuple2)this.ts.deserialize((DataInputView)in);
            return new Tuple3(t2.f0, t2.f1, (Object)topic);
        }

        public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
            return false;
        }

        public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
            return TypeInfoParser.parse((String)"Tuple3<Integer, Integer, String>");
        }

        public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
            return null;
        }

        public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
            ByteArrayOutputStream by = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)by);
            try {
                this.ts.serialize((Object)new Tuple2(element.f0, element.f1), (DataOutputView)out);
            }
            catch (IOException e) {
                throw new RuntimeException("Error", e);
            }
            return by.toByteArray();
        }

        public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
            return (String)element.f2;
        }
    }

    public static class BrokerKillingMapper<T>
    extends RichMapFunction<T, T>
    implements ListCheckpointed<Integer>,
    CheckpointListener {
        private static final long serialVersionUID = 6334389850158707313L;
        public static volatile boolean killedLeaderBefore;
        public static volatile boolean hasBeenCheckpointedBeforeFailure;
        private final int shutdownBrokerId;
        private final int failCount;
        private int numElementsTotal;
        private boolean failer;
        private boolean hasBeenCheckpointed;

        public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
            this.shutdownBrokerId = shutdownBrokerId;
            this.failCount = failCount;
        }

        public void open(Configuration parameters) {
            this.failer = this.getRuntimeContext().getIndexOfThisSubtask() == 0;
        }

        public T map(T value) throws Exception {
            ++this.numElementsTotal;
            if (!killedLeaderBefore) {
                Thread.sleep(10L);
                if (this.failer && this.numElementsTotal >= this.failCount) {
                    KafkaServer toShutDown = null;
                    for (KafkaServer server : KafkaTestBase.kafkaServer.getBrokers()) {
                        if (KafkaTestBase.kafkaServer.getBrokerId(server) != this.shutdownBrokerId) continue;
                        toShutDown = server;
                        break;
                    }
                    if (toShutDown == null) {
                        StringBuilder listOfBrokers = new StringBuilder();
                        for (KafkaServer server : KafkaTestBase.kafkaServer.getBrokers()) {
                            listOfBrokers.append(KafkaTestBase.kafkaServer.getBrokerId(server));
                            listOfBrokers.append(" ; ");
                        }
                        throw new Exception("Cannot find broker to shut down: " + this.shutdownBrokerId + " ; available brokers: " + listOfBrokers.toString());
                    }
                    hasBeenCheckpointedBeforeFailure = this.hasBeenCheckpointed;
                    killedLeaderBefore = true;
                    toShutDown.shutdown();
                }
            }
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.hasBeenCheckpointed = true;
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.numElementsTotal);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.numElementsTotal = state.get(0);
        }
    }

    public static class FixedNumberDeserializationSchema
    implements DeserializationSchema<Tuple2<Integer, Integer>> {
        final int finalCount;
        int count = 0;
        TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse((String)"Tuple2<Integer, Integer>");
        TypeSerializer<Tuple2<Integer, Integer>> ser = this.ti.createSerializer(new ExecutionConfig());

        public FixedNumberDeserializationSchema(int finalCount) {
            this.finalCount = finalCount;
        }

        public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStream(message));
            return (Tuple2)this.ser.deserialize((DataInputView)in);
        }

        public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) {
            return ++this.count >= this.finalCount;
        }

        public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
            return this.ti;
        }
    }

    public static class PojoValue {
        public Date when;
        public long lon;
        public long lat;
    }
}

