package org.apache.flink.streaming.connectors.kafka;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.QueryExp;
import kafka.common.NotLeaderForPartitionException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import kafka.server.KafkaServer;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.class */
public abstract class KafkaConsumerTestBase extends KafkaTestBase {

    @Rule
    public RetryRule retryRule = new RetryRule();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase$27, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase$27.class */
    public static /* synthetic */ class AnonymousClass27 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode = new int[StartupMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.SPECIFIC_OFFSETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[StartupMode.GROUP_OFFSETS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase$BrokerKillingMapper.class */
    public static class BrokerKillingMapper<T> extends RichMapFunction<T, T> implements ListCheckpointed<Integer>, CheckpointListener {
        private static final long serialVersionUID = 6334389850158707313L;
        public static volatile boolean killedLeaderBefore;
        public static volatile boolean hasBeenCheckpointedBeforeFailure;
        private final int shutdownBrokerId;
        private final int failCount;
        private int numElementsTotal;
        private boolean failer;
        private boolean hasBeenCheckpointed;

        public BrokerKillingMapper(int i, int i2) {
            this.shutdownBrokerId = i;
            this.failCount = i2;
        }

        public void open(Configuration configuration) {
            this.failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
        }

        public T map(T t) throws Exception {
            this.numElementsTotal++;
            if (!killedLeaderBefore) {
                Thread.sleep(10L);
                if (this.failer && this.numElementsTotal >= this.failCount) {
                    KafkaServer kafkaServer = null;
                    Iterator<KafkaServer> it = KafkaTestBase.kafkaServer.getBrokers().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        KafkaServer next = it.next();
                        if (KafkaTestBase.kafkaServer.getBrokerId(next) == this.shutdownBrokerId) {
                            kafkaServer = next;
                            break;
                        }
                    }
                    if (kafkaServer == null) {
                        StringBuilder sb = new StringBuilder();
                        Iterator<KafkaServer> it2 = KafkaTestBase.kafkaServer.getBrokers().iterator();
                        while (it2.hasNext()) {
                            sb.append(KafkaTestBase.kafkaServer.getBrokerId(it2.next()));
                            sb.append(" ; ");
                        }
                        throw new Exception("Cannot find broker to shut down: " + this.shutdownBrokerId + " ; available brokers: " + sb.toString());
                    }
                    hasBeenCheckpointedBeforeFailure = this.hasBeenCheckpointed;
                    killedLeaderBefore = true;
                    kafkaServer.shutdown();
                }
            }
            return t;
        }

        public void notifyCheckpointComplete(long j) {
            this.hasBeenCheckpointed = true;
        }

        public List<Integer> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(Integer.valueOf(this.numElementsTotal));
        }

        public void restoreState(List<Integer> list) throws Exception {
            if (list.isEmpty() || list.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size());
            }
            this.numElementsTotal = list.get(0).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase$FixedNumberDeserializationSchema.class */
    public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
        final int finalCount;
        int count = 0;
        TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
        TypeSerializer<Tuple2<Integer, Integer>> ser = this.ti.createSerializer(new ExecutionConfig());

        public FixedNumberDeserializationSchema(int i) {
            this.finalCount = i;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Tuple2<Integer, Integer> m10deserialize(byte[] bArr) throws IOException {
            return (Tuple2) this.ser.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(bArr)));
        }

        public boolean isEndOfStream(Tuple2<Integer, Integer> tuple2) {
            int i = this.count + 1;
            this.count = i;
            return i >= this.finalCount;
        }

        public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
            return this.ti;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase$PojoValue.class */
    public static class PojoValue {
        public Date when;
        public long lon;
        public long lat;
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase$Tuple2WithTopicSchema.class */
    private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
        private final TypeSerializer<Tuple2<Integer, Integer>> ts;

        public Tuple2WithTopicSchema(ExecutionConfig executionConfig) {
            this.ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(executionConfig);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Tuple3<Integer, Integer, String> m11deserialize(byte[] bArr, byte[] bArr2, String str, int i, long j) throws IOException {
            Tuple2 tuple2 = (Tuple2) this.ts.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(bArr2)));
            return new Tuple3<>(tuple2.f0, tuple2.f1, str);
        }

        public boolean isEndOfStream(Tuple3<Integer, Integer, String> tuple3) {
            return false;
        }

        public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
            return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
        }

        public byte[] serializeKey(Tuple3<Integer, Integer, String> tuple3) {
            return null;
        }

        public byte[] serializeValue(Tuple3<Integer, Integer, String> tuple3) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                this.ts.serialize(new Tuple2(tuple3.f0, tuple3.f1), new DataOutputViewStreamWrapper(byteArrayOutputStream));
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                throw new RuntimeException("Error", e);
            }
        }

        public String getTargetTopic(Tuple3<Integer, Integer, String> tuple3) {
            return (String) tuple3.f2;
        }
    }

    @Before
    public void ensureNoJobIsLingering() throws Exception {
        JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
    }

    public void runFailOnNoBrokerTest() throws Exception {
        try {
            Properties properties = new Properties();
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            executionEnvironment.setParallelism(1);
            properties.setProperty("bootstrap.servers", "localhost:80");
            properties.setProperty("zookeeper.connect", "localhost:80");
            properties.setProperty("group.id", "test");
            properties.setProperty("request.timeout.ms", "3000");
            properties.setProperty("socket.timeout.ms", "3000");
            properties.setProperty("session.timeout.ms", "2000");
            properties.setProperty("fetch.max.wait.ms", "2000");
            properties.setProperty("heartbeat.interval.ms", "1000");
            properties.putAll(secureProps);
            executionEnvironment.addSource(kafkaServer.getConsumer("doesntexist", (DeserializationSchema) new SimpleStringSchema(), properties)).print();
            executionEnvironment.execute("No broker test");
        } catch (JobExecutionException e) {
            if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
                Assert.assertTrue(e.getCause() instanceof TimeoutException);
                Assert.assertEquals("Timeout expired while fetching topic metadata", e.getCause().getMessage());
            } else {
                Assert.assertTrue(e.getCause() instanceof RuntimeException);
                Assert.assertTrue(((RuntimeException) e.getCause()).getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
            }
        }
    }

    public void runCommitOffsetsToKafka() throws Exception {
        String writeSequence = writeSequence("testCommitOffsetsToKafkaTopic", 50, 3, 1);
        final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.setParallelism(3);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.addSource(kafkaServer.getConsumer(writeSequence, (DeserializationSchema) new SimpleStringSchema(), standardProps)).addSink(new DiscardingSink());
        final AtomicReference atomicReference = new AtomicReference();
        new Thread("runner") { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    executionEnvironment.execute();
                } catch (Throwable th) {
                    if (th.getCause() instanceof JobCancellationException) {
                        return;
                    }
                    atomicReference.set(th);
                }
            }
        }.start();
        Long l = 50L;
        long nanoTime = 30000000000L + System.nanoTime();
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        do {
            Long committedOffset = createOffsetHandler.getCommittedOffset(writeSequence, 0);
            Long committedOffset2 = createOffsetHandler.getCommittedOffset(writeSequence, 1);
            Long committedOffset3 = createOffsetHandler.getCommittedOffset(writeSequence, 2);
            if (l.equals(committedOffset) && l.equals(committedOffset2) && l.equals(committedOffset3)) {
                break;
            } else {
                Thread.sleep(100L);
            }
        } while (System.nanoTime() < nanoTime);
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw new RuntimeException("Job failed with an exception", th);
        }
        Long committedOffset4 = createOffsetHandler.getCommittedOffset(writeSequence, 0);
        Long committedOffset5 = createOffsetHandler.getCommittedOffset(writeSequence, 1);
        Long committedOffset6 = createOffsetHandler.getCommittedOffset(writeSequence, 2);
        Assert.assertEquals(50L, committedOffset4);
        Assert.assertEquals(50L, committedOffset5);
        Assert.assertEquals(50L, committedOffset6);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    public void runStartFromKafkaCommitOffsets() throws Exception {
        Long committedOffset;
        Long committedOffset2;
        Long committedOffset3;
        String writeSequence = writeSequence("testStartFromKafkaCommitOffsetsTopic", 300, 3, 1);
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        int i = 0;
        do {
            i++;
            LOG.info("Attempt " + i + " to read records and commit some offsets to Kafka");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.getConfig().disableSysoutLogging();
            executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            executionEnvironment.setParallelism(3);
            executionEnvironment.enableCheckpointing(20L);
            executionEnvironment.addSource(kafkaServer.getConsumer(writeSequence, (DeserializationSchema) new SimpleStringSchema(), standardProps)).map(new ThrottledMapper(50)).map(new MapFunction<String, Object>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.2
                int count = 0;

                public Object map(String str) throws Exception {
                    this.count++;
                    if (this.count == 150) {
                        throw new SuccessException();
                    }
                    return null;
                }
            }).addSink(new DiscardingSink());
            TestUtils.tryExecute(executionEnvironment, "Read some records to commit offsets to Kafka");
            committedOffset = createOffsetHandler.getCommittedOffset(writeSequence, 0);
            committedOffset2 = createOffsetHandler.getCommittedOffset(writeSequence, 1);
            committedOffset3 = createOffsetHandler.getCommittedOffset(writeSequence, 2);
            if (committedOffset != null || committedOffset2 != null || committedOffset3 != null) {
                break;
            }
        } while (i < 3);
        if (committedOffset == null && committedOffset2 == null && committedOffset3 == null) {
            throw new RuntimeException("No offsets have been committed after 3 attempts");
        }
        LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", new Object[]{committedOffset, committedOffset2, committedOffset3});
        StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment2.getConfig().disableSysoutLogging();
        executionEnvironment2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment2.setParallelism(3);
        HashMap hashMap = new HashMap();
        hashMap.put(0, new Tuple2(Integer.valueOf(committedOffset != null ? (int) (300 - committedOffset.longValue()) : 300), Integer.valueOf(committedOffset != null ? committedOffset.intValue() : 0)));
        hashMap.put(1, new Tuple2(Integer.valueOf(committedOffset2 != null ? (int) (300 - committedOffset2.longValue()) : 300), Integer.valueOf(committedOffset2 != null ? committedOffset2.intValue() : 0)));
        hashMap.put(2, new Tuple2(Integer.valueOf(committedOffset3 != null ? (int) (300 - committedOffset3.longValue()) : 300), Integer.valueOf(committedOffset3 != null ? committedOffset3.intValue() : 0)));
        readSequence(executionEnvironment2, StartupMode.GROUP_OFFSETS, null, standardProps, writeSequence, hashMap);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
        String writeSequence = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", 50, 3, 1);
        final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.setParallelism(3);
        executionEnvironment.enableCheckpointing(200L);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "latest");
        executionEnvironment.addSource(kafkaServer.getConsumer(writeSequence, (DeserializationSchema) new SimpleStringSchema(), properties)).addSink(new DiscardingSink());
        final AtomicReference atomicReference = new AtomicReference();
        new Thread("runner") { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    executionEnvironment.execute();
                } catch (Throwable th) {
                    if (th.getCause() instanceof JobCancellationException) {
                        return;
                    }
                    atomicReference.set(th);
                }
            }
        }.start();
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        Long l = 50L;
        long nanoTime = 30000000000L + System.nanoTime();
        do {
            Long committedOffset = createOffsetHandler.getCommittedOffset(writeSequence, 0);
            Long committedOffset2 = createOffsetHandler.getCommittedOffset(writeSequence, 1);
            Long committedOffset3 = createOffsetHandler.getCommittedOffset(writeSequence, 2);
            if (l.equals(committedOffset) && l.equals(committedOffset2) && l.equals(committedOffset3)) {
                break;
            } else {
                Thread.sleep(100L);
            }
        } while (System.nanoTime() < nanoTime);
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw new RuntimeException("Job failed with an exception", th);
        }
        Long committedOffset4 = createOffsetHandler.getCommittedOffset(writeSequence, 0);
        Long committedOffset5 = createOffsetHandler.getCommittedOffset(writeSequence, 1);
        Long committedOffset6 = createOffsetHandler.getCommittedOffset(writeSequence, 2);
        Assert.assertEquals(50L, committedOffset4);
        Assert.assertEquals(50L, committedOffset5);
        Assert.assertEquals(50L, committedOffset6);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    public void runStartFromEarliestOffsets() throws Exception {
        String writeSequence = writeSequence("testStartFromEarliestOffsetsTopic", 50, 3, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setParallelism(3);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "latest");
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        createOffsetHandler.setCommittedOffset(writeSequence, 0, 23L);
        createOffsetHandler.setCommittedOffset(writeSequence, 1, 31L);
        createOffsetHandler.setCommittedOffset(writeSequence, 2, 43L);
        readSequence(executionEnvironment, StartupMode.EARLIEST, null, properties, 3, writeSequence, 50, 0);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    public void runStartFromLatestOffsets() throws Exception {
        String writeSequence = writeSequence("testStartFromLatestOffsetsTopic", 50, 3, 1);
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        createOffsetHandler.setCommittedOffset(writeSequence, 0, 23L);
        createOffsetHandler.setCommittedOffset(writeSequence, 1, 31L);
        createOffsetHandler.setCommittedOffset(writeSequence, 2, 43L);
        TypeInformation of = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.4
        });
        KeyedSerializationSchemaWrapper keyedSerializationSchemaWrapper = new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(of, new ExecutionConfig()));
        KeyedDeserializationSchemaWrapper keyedDeserializationSchemaWrapper = new KeyedDeserializationSchemaWrapper(new TypeInformationSerializationSchema(of, new ExecutionConfig()));
        final StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setParallelism(3);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "earliest");
        FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer(writeSequence, (KeyedDeserializationSchema) keyedDeserializationSchemaWrapper, properties);
        consumer.setStartFromLatest();
        executionEnvironment.addSource(consumer).setParallelism(3).flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Object>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.5
            public void flatMap(Tuple2<Integer, Integer> tuple2, Collector<Object> collector) throws Exception {
                if (((Integer) tuple2.f1).intValue() - 50 < 0) {
                    throw new RuntimeException("test failed; consumed a record that was previously written: " + tuple2);
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<Integer, Integer>) obj, (Collector<Object>) collector);
            }
        }).setParallelism(1).addSink(new DiscardingSink());
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.6
            @Override // java.lang.Runnable
            public void run() {
                try {
                    executionEnvironment.execute("Consume Extra Records Job");
                } catch (Throwable th) {
                    if (th instanceof JobCancellationException) {
                        return;
                    }
                    atomicReference.set(th);
                }
            }
        });
        thread.start();
        JobManagerCommunicationUtils.waitUntilJobIsRunning(flink.getLeaderGateway(timeout), "Consume Extra Records Job");
        StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment2.setParallelism(3);
        kafkaServer.produceIntoKafka(executionEnvironment2.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.7
            private boolean running = true;

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i = 50; this.running && i < 250; i++) {
                    sourceContext.collect(new Tuple2(Integer.valueOf(indexOfThisSubtask), Integer.valueOf(i)));
                }
            }

            public void cancel() {
                this.running = false;
            }
        }), writeSequence, keyedSerializationSchemaWrapper, properties, null);
        try {
            executionEnvironment2.execute("Write Extra Records Job");
            JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Consume Extra Records Job");
            thread.join();
            createOffsetHandler.close();
            deleteTestTopic(writeSequence);
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                throw new Exception("Exception in the consuming thread", th);
            }
        } catch (Exception e) {
            throw new RuntimeException("Writing extra records failed", e);
        }
    }

    public void runStartFromGroupOffsets() throws Exception {
        String writeSequence = writeSequence("testStartFromGroupOffsetsTopic", 50, 3, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setParallelism(3);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "earliest");
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        createOffsetHandler.setCommittedOffset(writeSequence, 0, 23L);
        createOffsetHandler.setCommittedOffset(writeSequence, 2, 43L);
        HashMap hashMap = new HashMap();
        hashMap.put(0, new Tuple2<>(27, 23));
        hashMap.put(1, new Tuple2<>(50, 0));
        hashMap.put(2, new Tuple2<>(7, 43));
        readSequence(executionEnvironment, StartupMode.GROUP_OFFSETS, null, properties, writeSequence, hashMap);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    public void runStartFromSpecificOffsets() throws Exception {
        String writeSequence = writeSequence("testStartFromSpecificOffsetsTopic", 50, 4, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setParallelism(4);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "earliest");
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition(writeSequence, 0), 19L);
        hashMap.put(new KafkaTopicPartition(writeSequence, 2), 22L);
        hashMap.put(new KafkaTopicPartition(writeSequence, 4), 26L);
        KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler = kafkaServer.createOffsetHandler();
        createOffsetHandler.setCommittedOffset(writeSequence, 0, 23L);
        createOffsetHandler.setCommittedOffset(writeSequence, 1, 31L);
        createOffsetHandler.setCommittedOffset(writeSequence, 2, 43L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(0, new Tuple2<>(31, 19));
        hashMap2.put(1, new Tuple2<>(19, 31));
        hashMap2.put(2, new Tuple2<>(28, 22));
        hashMap2.put(3, new Tuple2<>(50, 0));
        readSequence(executionEnvironment, StartupMode.SPECIFIC_OFFSETS, hashMap, properties, writeSequence, hashMap2);
        createOffsetHandler.close();
        deleteTestTopic(writeSequence);
    }

    @RetryOnException(times = 2, exception = NotLeaderForPartitionException.class)
    public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
        String str = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
        String str2 = "additionalEmptyTopic_" + UUID.randomUUID().toString();
        createTestTopic(str, 3, 2);
        createTestTopic(str2, 3, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        TypeInformation parse = TypeInfoParser.parse("Tuple2<Long, String>");
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(parse, executionEnvironment.getConfig());
        TypeInformationSerializationSchema typeInformationSerializationSchema2 = new TypeInformationSerializationSchema(parse, executionEnvironment.getConfig());
        DataStreamSource addSource = executionEnvironment.addSource(new RichParallelSourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.8
            private boolean running = true;

            public void run(SourceFunction.SourceContext<Tuple2<Long, String>> sourceContext) throws InterruptedException {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask() * 100;
                int i = indexOfThisSubtask + 100;
                while (this.running && indexOfThisSubtask < i) {
                    sourceContext.collect(new Tuple2(Long.valueOf(1000 + indexOfThisSubtask), "kafka-" + indexOfThisSubtask));
                    indexOfThisSubtask++;
                    Thread.sleep(50L);
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
        propertiesFromBrokerList.setProperty("retries", "3");
        propertiesFromBrokerList.putAll(secureProps);
        kafkaServer.produceIntoKafka(addSource, str, new KeyedSerializationSchemaWrapper(typeInformationSerializationSchema2), propertiesFromBrokerList, null);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        arrayList.add(str2);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer((List<String>) arrayList, (DeserializationSchema) typeInformationSerializationSchema, properties)).setParallelism(3).addSink(new RichSinkFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.9
            private int elCnt = 0;
            private BitSet validator = new BitSet(300);

            public void invoke(Tuple2<Long, String> tuple2) throws Exception {
                int parseInt = Integer.parseInt(((String) tuple2.f1).split("-")[1]);
                Assert.assertEquals(((Long) tuple2.f0).longValue() - 1000, parseInt);
                Assert.assertFalse("Received tuple twice", this.validator.get(parseInt));
                this.validator.set(parseInt);
                this.elCnt++;
                if (this.elCnt == 300) {
                    int nextClearBit = this.validator.nextClearBit(0);
                    if (nextClearBit != 300) {
                        Assert.fail("The bitset was not set to 1 on all elements. Next clear:" + nextClearBit + " Set: " + this.validator);
                    }
                    throw new SuccessException();
                }
            }

            public void close() throws Exception {
                super.close();
            }
        }).setParallelism(1);
        try {
            tryExecutePropagateExceptions(executionEnvironment, "runSimpleConcurrentProducerConsumerTopology");
            deleteTestTopic(str);
        } catch (ProgramInvocationException | JobExecutionException e) {
            int i = 0;
            for (Throwable cause = e.getCause(); cause != null; cause = cause.getCause()) {
                int i2 = i;
                i++;
                if (i2 >= 20) {
                    break;
                } else {
                    if (cause instanceof NotLeaderForPartitionException) {
                        throw ((Exception) cause);
                    }
                }
            }
            throw e;
        }
    }

    public void runOneToOneExactlyOnceTest() throws Exception {
        createTestTopic("oneToOneTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "oneToOneTopic", 5, 1000, true);
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(5);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("oneToOneTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).map(new PartitionValidatingMapper(5, 1)).map(new FailingIdentityMapper(333)).addSink(new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute(executionEnvironment, "One-to-one exactly once test");
        deleteTestTopic("oneToOneTopic");
    }

    public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
        createTestTopic("oneToManyTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "oneToManyTopic", 5, 1000, false);
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(2);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("oneToManyTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).map(new PartitionValidatingMapper(5, 3)).map(new FailingIdentityMapper(333)).addSink(new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute(executionEnvironment, "One-source-multi-partitions exactly once test");
        deleteTestTopic("oneToManyTopic");
    }

    public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
        createTestTopic("manyToOneTopic", 5, 1);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "manyToOneTopic", 5, 1000, true);
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(8);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.setBufferTimeout(0L);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("manyToOneTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).map(new PartitionValidatingMapper(5, 1)).map(new FailingIdentityMapper(333)).addSink(new ValidatingExactlyOnceSink(5000)).setParallelism(1);
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute(executionEnvironment, "multi-source-one-partitions exactly once test");
        deleteTestTopic("manyToOneTopic");
    }

    public void runCancelingOnFullInputTest() throws Exception {
        createTestTopic("cancelingOnFullTopic", 3, 1);
        DataGenerators.InfiniteStringsGenerator infiniteStringsGenerator = new DataGenerators.InfiniteStringsGenerator(kafkaServer, "cancelingOnFullTopic");
        infiniteStringsGenerator.start();
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.10
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment.setParallelism(3);
                    executionEnvironment.enableCheckpointing(100L);
                    executionEnvironment.getConfig().disableSysoutLogging();
                    Properties properties = new Properties();
                    properties.putAll(KafkaTestBase.standardProps);
                    properties.putAll(KafkaTestBase.secureProps);
                    executionEnvironment.addSource(KafkaTestBase.kafkaServer.getConsumer("cancelingOnFullTopic", (DeserializationSchema) new SimpleStringSchema(), properties)).addSink(new DiscardingSink());
                    executionEnvironment.execute("Runner for CancelingOnFullInputTest");
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        }, "program runner thread");
        thread.start();
        Thread.sleep(2000L);
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            th.printStackTrace();
            Assert.fail("Test failed prematurely with: " + th.getMessage());
        }
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
        thread.join();
        Throwable th2 = (Throwable) atomicReference.get();
        Assert.assertNotNull("program did not fail properly due to canceling", th2);
        Assert.assertTrue(th2.getMessage().contains("Job was cancelled"));
        if (infiniteStringsGenerator.isAlive()) {
            infiniteStringsGenerator.shutdown();
            infiniteStringsGenerator.join();
        } else {
            Throwable error = infiniteStringsGenerator.getError();
            if (error != null) {
                error.printStackTrace();
                Assert.fail("Generator failed: " + error.getMessage());
            } else {
                Assert.fail("Generator failed with no exception");
            }
        }
        deleteTestTopic("cancelingOnFullTopic");
    }

    public void runCancelingOnEmptyInputTest() throws Exception {
        createTestTopic("cancelingOnEmptyInputTopic", 3, 1);
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment.setParallelism(3);
                    executionEnvironment.enableCheckpointing(100L);
                    executionEnvironment.getConfig().disableSysoutLogging();
                    Properties properties = new Properties();
                    properties.putAll(KafkaTestBase.standardProps);
                    properties.putAll(KafkaTestBase.secureProps);
                    executionEnvironment.addSource(KafkaTestBase.kafkaServer.getConsumer("cancelingOnEmptyInputTopic", (DeserializationSchema) new SimpleStringSchema(), properties)).addSink(new DiscardingSink());
                    executionEnvironment.execute("CancelingOnEmptyInputTest");
                } catch (Throwable th) {
                    KafkaTestBase.LOG.error("Job Runner failed with exception", th);
                    atomicReference.set(th);
                }
            }
        }, "program runner thread");
        thread.start();
        Thread.sleep(2000L);
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            th.printStackTrace();
            Assert.fail("Test failed prematurely with: " + th.getMessage());
        }
        JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
        thread.join();
        Throwable th2 = (Throwable) atomicReference.get();
        Assert.assertNotNull("program did not fail properly due to canceling", th2);
        Assert.assertTrue(th2.getMessage().contains("Job was cancelled"));
        deleteTestTopic("cancelingOnEmptyInputTopic");
    }

    public void runFailOnDeployTest() throws Exception {
        createTestTopic("failOnDeployTopic", 2, 1);
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(12);
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("failOnDeployTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).addSink(new DiscardingSink());
        try {
            executionEnvironment.execute("test fail on deploy");
            Assert.fail("this test should fail with an exception");
        } catch (JobExecutionException e) {
            Throwable cause = e.getCause();
            int i = 0;
            boolean z = false;
            while (true) {
                if (cause == null) {
                    break;
                }
                int i2 = i;
                i++;
                if (i2 >= 20) {
                    break;
                }
                if (cause instanceof NoResourceAvailableException) {
                    z = true;
                    break;
                }
                cause = cause.getCause();
            }
            Assert.assertTrue("Wrong exception", z);
        }
        deleteTestTopic("failOnDeployTopic");
    }

    public void runProduceConsumeMultipleTopics() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            String str = "topic-" + i;
            arrayList.add(str);
            createTestTopic(str, i + 1, 1);
        }
        executionEnvironment.setParallelism(1);
        DataStreamSource addSource = executionEnvironment.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.12
            public void run(SourceFunction.SourceContext<Tuple3<Integer, Integer, String>> sourceContext) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i2 = 0; i2 < 5; i2++) {
                    for (int i3 = 0; i3 < 20; i3++) {
                        sourceContext.collect(new Tuple3(Integer.valueOf(indexOfThisSubtask), Integer.valueOf(i3), "topic-" + i2));
                    }
                }
            }

            public void cancel() {
            }
        });
        Tuple2WithTopicSchema tuple2WithTopicSchema = new Tuple2WithTopicSchema(executionEnvironment.getConfig());
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        kafkaServer.produceIntoKafka(addSource, "dummy", tuple2WithTopicSchema, properties, null);
        executionEnvironment.execute("Write to topics");
        StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment2.getConfig().disableSysoutLogging();
        executionEnvironment2.addSource(kafkaServer.getConsumer(arrayList, tuple2WithTopicSchema, properties)).flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.13
            Map<String, Integer> countPerTopic = new HashMap(5);

            /* JADX WARN: Multi-variable type inference failed */
            public void flatMap(Tuple3<Integer, Integer, String> tuple3, Collector<Integer> collector) throws Exception {
                Integer num = this.countPerTopic.get(tuple3.f2);
                this.countPerTopic.put(tuple3.f2, num == null ? 1 : Integer.valueOf(num.intValue() + 1));
                for (Map.Entry<String, Integer> entry : this.countPerTopic.entrySet()) {
                    if (entry.getValue().intValue() < 20) {
                        break;
                    } else if (entry.getValue().intValue() > 20) {
                        throw new RuntimeException("There is a failure in the test. I've read " + entry.getValue() + " from topic " + entry.getKey());
                    }
                }
                throw new SuccessException();
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple3<Integer, Integer, String>) obj, (Collector<Integer>) collector);
            }
        }).setParallelism(1);
        TestUtils.tryExecute(executionEnvironment2, "Count elements from the topics");
        for (int i2 = 0; i2 < 5; i2++) {
            deleteTestTopic("topic-" + i2);
        }
    }

    public void runBigRecordTestTopology() throws Exception {
        createTestTopic("bigRecordTestTopic", 1, 1);
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(TypeInfoParser.parse("Tuple2<Long, byte[]>"), new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(1);
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("fetch.message.max.bytes", Integer.toString(14680064));
        properties.setProperty("max.partition.fetch.bytes", Integer.toString(14680064));
        properties.setProperty("queued.max.message.chunks", "1");
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("bigRecordTestTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).addSink(new SinkFunction<Tuple2<Long, byte[]>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.14
            private int elCnt = 0;

            public void invoke(Tuple2<Long, byte[]> tuple2) throws Exception {
                this.elCnt++;
                if (((Long) tuple2.f0).longValue() == -1) {
                    if (this.elCnt != 11) {
                        throw new RuntimeException("There have been " + this.elCnt + " elements");
                    }
                    throw new SuccessException();
                }
                if (this.elCnt > 10) {
                    throw new RuntimeException("More than 10 elements seen: " + this.elCnt);
                }
            }
        });
        Properties properties2 = new Properties();
        properties2.setProperty("max.request.size", Integer.toString(15728640));
        properties2.setProperty("retries", "3");
        properties2.putAll(secureProps);
        properties2.setProperty("bootstrap.servers", brokerConnectionStrings);
        kafkaServer.produceIntoKafka(executionEnvironment.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.15
            private boolean running;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                this.running = true;
            }

            public void run(SourceFunction.SourceContext<Tuple2<Long, byte[]>> sourceContext) throws Exception {
                Random random = new Random();
                long j = 0;
                while (this.running) {
                    long j2 = j;
                    j = j2 + 1;
                    new Tuple2(Long.valueOf(j2), new byte[7340032 + random.nextInt(7340032)]);
                    sourceContext.collect(sourceContext);
                    Thread.sleep(100L);
                    if (j == 10) {
                        sourceContext.collect(new Tuple2(-1L, new byte[]{1}));
                        return;
                    }
                }
            }

            public void cancel() {
                this.running = false;
            }
        }), "bigRecordTestTopic", new KeyedSerializationSchemaWrapper(typeInformationSerializationSchema), properties2, null);
        TestUtils.tryExecute(executionEnvironment, "big topology test");
        deleteTestTopic("bigRecordTestTopic");
    }

    public void runBrokerFailureTest() throws Exception {
        createTestTopic("brokerFailureTestTopic", 2, 2);
        DataGenerators.generateRandomizedIntegerSequence(StreamExecutionEnvironment.getExecutionEnvironment(), kafkaServer, "brokerFailureTestTopic", 2, 1000, true);
        int leaderToShutDown = kafkaServer.getLeaderToShutDown("brokerFailureTestTopic");
        LOG.info("Leader to shutdown {}", Integer.valueOf(leaderToShutDown));
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer("brokerFailureTestTopic", (DeserializationSchema) typeInformationSerializationSchema, properties)).map(new PartitionValidatingMapper(2, 1)).map(new BrokerKillingMapper(leaderToShutDown, 333)).addSink(new ValidatingExactlyOnceSink(2000)).setParallelism(1);
        BrokerKillingMapper.killedLeaderBefore = false;
        TestUtils.tryExecute(executionEnvironment, "Broker failure once test");
        kafkaServer.restartBroker(leaderToShutDown);
    }

    public void runKeyValueTest() throws Exception {
        createTestTopic("keyvaluetest", 1, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        DataStreamSource addSource = executionEnvironment.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.16
            public void run(SourceFunction.SourceContext<Tuple2<Long, PojoValue>> sourceContext) throws Exception {
                Random random = new Random(1337L);
                long j = 0;
                while (true) {
                    long j2 = j;
                    if (j2 >= 5000) {
                        return;
                    }
                    PojoValue pojoValue = new PojoValue();
                    pojoValue.when = new Date(random.nextLong());
                    pojoValue.lon = random.nextLong();
                    pojoValue.lat = j2;
                    sourceContext.collect(new Tuple2(j2 % 2 == 0 ? null : Long.valueOf(j2), pojoValue));
                    j = j2 + 1;
                }
            }

            public void cancel() {
            }
        });
        TypeInformationKeyValueSerializationSchema typeInformationKeyValueSerializationSchema = new TypeInformationKeyValueSerializationSchema(Long.class, PojoValue.class, executionEnvironment.getConfig());
        Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
        propertiesFromBrokerList.setProperty("retries", "3");
        kafkaServer.produceIntoKafka(addSource, "keyvaluetest", typeInformationKeyValueSerializationSchema, propertiesFromBrokerList, null);
        executionEnvironment.execute("Write KV to Kafka");
        StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment2.setParallelism(1);
        executionEnvironment2.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment2.getConfig().disableSysoutLogging();
        TypeInformationKeyValueSerializationSchema typeInformationKeyValueSerializationSchema2 = new TypeInformationKeyValueSerializationSchema(Long.class, PojoValue.class, executionEnvironment2.getConfig());
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment2.addSource(kafkaServer.getConsumer("keyvaluetest", (KeyedDeserializationSchema) typeInformationKeyValueSerializationSchema2, properties)).flatMap(new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.17
            long counter = 0;

            public void flatMap(Tuple2<Long, PojoValue> tuple2, Collector<Object> collector) throws Exception {
                Assert.assertTrue("Wrong value " + ((PojoValue) tuple2.f1).lat, ((PojoValue) tuple2.f1).lat == this.counter);
                if (((PojoValue) tuple2.f1).lat % 2 == 0) {
                    Assert.assertNull("key was not null", tuple2.f0);
                } else {
                    Assert.assertTrue("Wrong value " + tuple2.f0, ((Long) tuple2.f0).longValue() == this.counter);
                }
                this.counter++;
                if (this.counter == 5000) {
                    throw new SuccessException();
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<Long, PojoValue>) obj, (Collector<Object>) collector);
            }
        });
        TestUtils.tryExecute(executionEnvironment2, "Read KV from Kafka");
        deleteTestTopic("keyvaluetest");
    }

    public void runAllDeletesTest() throws Exception {
        createTestTopic("alldeletestest", 1, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        DataStreamSource addSource = executionEnvironment.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.18
            public void run(SourceFunction.SourceContext<Tuple2<byte[], PojoValue>> sourceContext) throws Exception {
                Random random = new Random(1337L);
                long j = 0;
                while (true) {
                    long j2 = j;
                    if (j2 >= 300) {
                        return;
                    }
                    byte[] bArr = new byte[200];
                    random.nextBytes(bArr);
                    sourceContext.collect(new Tuple2(bArr, (PojoValue) null));
                    j = j2 + 1;
                }
            }

            public void cancel() {
            }
        });
        TypeInformationKeyValueSerializationSchema typeInformationKeyValueSerializationSchema = new TypeInformationKeyValueSerializationSchema(byte[].class, PojoValue.class, executionEnvironment.getConfig());
        Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
        propertiesFromBrokerList.setProperty("retries", "3");
        propertiesFromBrokerList.putAll(secureProps);
        kafkaServer.produceIntoKafka(addSource, "alldeletestest", typeInformationKeyValueSerializationSchema, propertiesFromBrokerList, null);
        executionEnvironment.execute("Write deletes to Kafka");
        StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment2.setParallelism(1);
        executionEnvironment2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment2.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment2.addSource(kafkaServer.getConsumer("alldeletestest", (KeyedDeserializationSchema) typeInformationKeyValueSerializationSchema, properties)).flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.19
            long counter = 0;

            public void flatMap(Tuple2<byte[], PojoValue> tuple2, Collector<Object> collector) throws Exception {
                Assert.assertNull(tuple2.f1);
                this.counter++;
                if (this.counter == 300) {
                    throw new SuccessException();
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<byte[], PojoValue>) obj, (Collector<Object>) collector);
            }
        });
        TestUtils.tryExecute(executionEnvironment2, "Read deletes from Kafka");
        deleteTestTopic("alldeletestest");
    }

    public void runEndOfStreamTest() throws Exception {
        String writeSequence = writeSequence("testEndOfStream", 300, 1, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        executionEnvironment.addSource(kafkaServer.getConsumer(writeSequence, new FixedNumberDeserializationSchema(300), properties)).flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.20
            public void flatMap(Tuple2<Integer, Integer> tuple2, Collector<Void> collector) throws Exception {
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<Integer, Integer>) obj, (Collector<Void>) collector);
            }
        });
        TestUtils.tryExecute(executionEnvironment, "Consume 300 elements from Kafka");
        deleteTestTopic(writeSequence);
    }

    public void runMetricsTest() throws Throwable {
        createTestTopic("metricsStream", 5, 1);
        final Tuple1 tuple1 = new Tuple1((Object) null);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.21
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
                    executionEnvironment.setParallelism(1);
                    executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                    executionEnvironment.getConfig().disableSysoutLogging();
                    executionEnvironment.disableOperatorChaining();
                    Properties properties = new Properties();
                    properties.putAll(KafkaTestBase.standardProps);
                    properties.putAll(KafkaTestBase.secureProps);
                    TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(TypeInfoParser.parse("Tuple2<Integer, Integer>"), executionEnvironment.getConfig());
                    executionEnvironment.addSource(KafkaTestBase.kafkaServer.getConsumer("metricsStream", (DeserializationSchema) typeInformationSerializationSchema, KafkaTestBase.standardProps)).flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.21.1
                        public void flatMap(Tuple2<Integer, Integer> tuple2, Collector<Void> collector) throws Exception {
                        }

                        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                            flatMap((Tuple2<Integer, Integer>) obj, (Collector<Void>) collector);
                        }
                    });
                    KafkaTestBase.kafkaServer.produceIntoKafka(executionEnvironment.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.21.2
                        boolean running = true;

                        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
                            int i = 0;
                            while (this.running) {
                                int i2 = i;
                                i++;
                                sourceContext.collect(Tuple2.of(Integer.valueOf(i2), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask())));
                                Thread.sleep(1L);
                            }
                        }

                        public void cancel() {
                            this.running = false;
                        }
                    }), "metricsStream", new KeyedSerializationSchemaWrapper(typeInformationSerializationSchema), KafkaTestBase.standardProps, null);
                    executionEnvironment.execute("Metrics test job");
                } catch (Throwable th) {
                    KafkaTestBase.LOG.warn("Got exception during execution", th);
                    if (th instanceof JobCancellationException) {
                        return;
                    }
                    tuple1.f0 = th;
                }
            }
        });
        thread.start();
        try {
            MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
            Set queryNames = platformMBeanServer.queryNames(new ObjectName("*current-offsets*:*"), (QueryExp) null);
            while (queryNames.size() < 5) {
                if (tuple1.f0 != null) {
                    throw ((Throwable) tuple1.f0);
                }
                queryNames = platformMBeanServer.queryNames(new ObjectName("*current-offsets*:*"), (QueryExp) null);
                Thread.sleep(50L);
            }
            Assert.assertEquals(5L, queryNames.size());
            while (true) {
                int i = 0;
                Iterator it = queryNames.iterator();
                while (it.hasNext()) {
                    if (((Long) platformMBeanServer.getAttribute((ObjectName) it.next(), "Value")).longValue() >= 0) {
                        i++;
                    }
                }
                if (i == 5) {
                    break;
                } else {
                    Thread.sleep(50L);
                }
            }
            Assert.assertTrue("No producer metrics found", platformMBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), (QueryExp) null).size() > 30);
            LOG.info("Found all JMX metrics. Cancelling job.");
            JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
            while (thread.isAlive()) {
                Thread.sleep(50L);
            }
            if (tuple1.f0 != null) {
                throw ((Throwable) tuple1.f0);
            }
            deleteTestTopic("metricsStream");
        } catch (Throwable th) {
            JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
            throw th;
        }
    }

    protected void readSequence(StreamExecutionEnvironment streamExecutionEnvironment, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, Properties properties, String str, final Map<Integer, Tuple2<Integer, Integer>> map2) throws Exception {
        int size = map2.keySet().size();
        int i = 0;
        Iterator<Map.Entry<Integer, Tuple2<Integer, Integer>>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            i += ((Integer) it.next().getValue().f0).intValue();
        }
        final int i2 = i;
        TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(TypeInfoParser.parse("Tuple2<Integer, Integer>"), streamExecutionEnvironment.getConfig());
        properties.putAll(secureProps);
        FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer(str, (DeserializationSchema) typeInformationSerializationSchema, properties);
        switch (AnonymousClass27.$SwitchMap$org$apache$flink$streaming$connectors$kafka$config$StartupMode[startupMode.ordinal()]) {
            case 1:
                consumer.setStartFromEarliest();
                break;
            case 2:
                consumer.setStartFromLatest();
                break;
            case 3:
                consumer.setStartFromSpecificOffsets(map);
                break;
            case 4:
                consumer.setStartFromGroupOffsets();
                break;
        }
        streamExecutionEnvironment.addSource(consumer).setParallelism(size).map(new ThrottledMapper(20)).setParallelism(size).flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.22
            private HashMap<Integer, BitSet> partitionsToValueCheck;
            private int count = 0;

            public void open(Configuration configuration) throws Exception {
                this.partitionsToValueCheck = new HashMap<>();
                Iterator it2 = map2.keySet().iterator();
                while (it2.hasNext()) {
                    this.partitionsToValueCheck.put((Integer) it2.next(), new BitSet());
                }
            }

            public void flatMap(Tuple2<Integer, Integer> tuple2, Collector<Integer> collector) throws Exception {
                int intValue = ((Integer) tuple2.f0).intValue();
                int intValue2 = ((Integer) tuple2.f1).intValue();
                BitSet bitSet = this.partitionsToValueCheck.get(Integer.valueOf(intValue));
                if (bitSet == null) {
                    throw new RuntimeException("Got a record from an unknown partition");
                }
                bitSet.set(intValue2 - ((Integer) ((Tuple2) map2.get(Integer.valueOf(intValue))).f1).intValue());
                this.count++;
                KafkaTestBase.LOG.info("Received message {}, total {} messages", tuple2, Integer.valueOf(this.count));
                if (this.count == i2) {
                    for (Map.Entry<Integer, BitSet> entry : this.partitionsToValueCheck.entrySet()) {
                        BitSet value = entry.getValue();
                        int intValue3 = ((Integer) ((Tuple2) map2.get(entry.getKey())).f0).intValue();
                        if (value.cardinality() != intValue3) {
                            throw new RuntimeException("Expected cardinality to be " + intValue3 + ", but was " + value.cardinality());
                        }
                        if (value.nextClearBit(0) != intValue3) {
                            throw new RuntimeException("Expected next clear bit to be " + intValue3 + ", but was " + value.cardinality());
                        }
                    }
                    throw new SuccessException();
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Tuple2<Integer, Integer>) obj, (Collector<Integer>) collector);
            }
        }).setParallelism(1);
        TestUtils.tryExecute(streamExecutionEnvironment, "Read data from Kafka");
        LOG.info("Successfully read sequence for verification");
    }

    protected void readSequence(StreamExecutionEnvironment streamExecutionEnvironment, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, Properties properties, int i, String str, int i2, int i3) throws Exception {
        HashMap hashMap = new HashMap();
        for (int i4 = 0; i4 < i; i4++) {
            hashMap.put(Integer.valueOf(i4), new Tuple2(Integer.valueOf(i2), Integer.valueOf(i3)));
        }
        readSequence(streamExecutionEnvironment, startupMode, map, properties, str, hashMap);
    }

    /* JADX WARN: Type inference failed for: r0v65, types: [long, java.util.concurrent.atomic.AtomicReference] */
    protected String writeSequence(String str, final int i, final int i2, int i3) throws Exception {
        boolean z;
        LOG.info("\n===================================\n== Writing sequence of " + i + " into " + str + " with p=" + i2 + "\n===================================");
        TypeInformation of = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.23
        });
        KeyedSerializationSchemaWrapper keyedSerializationSchemaWrapper = new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(of, new ExecutionConfig()));
        KeyedDeserializationSchemaWrapper keyedDeserializationSchemaWrapper = new KeyedDeserializationSchemaWrapper(new TypeInformationSerializationSchema(of, new ExecutionConfig()));
        for (int i4 = 1; i4 <= 10; i4++) {
            String str2 = str + '-' + i4;
            LOG.info("Writing attempt #1");
            createTestTopic(str2, i2, i3);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            executionEnvironment.getConfig().disableSysoutLogging();
            DataStreamSource parallelism = executionEnvironment.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.24
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
                    int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                    for (int i5 = 0; this.running && i5 < i; i5++) {
                        sourceContext.collect(new Tuple2(Integer.valueOf(indexOfThisSubtask), Integer.valueOf(i5)));
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(i2);
            Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
            propertiesFromBrokerList.setProperty("retries", "0");
            propertiesFromBrokerList.putAll(secureProps);
            kafkaServer.produceIntoKafka(parallelism, str2, keyedSerializationSchemaWrapper, propertiesFromBrokerList, new Tuple2FlinkPartitioner(i2)).setParallelism(i2);
            try {
                executionEnvironment.execute("Write sequence");
                LOG.info("Finished writing sequence");
                LOG.info("Validating sequence");
                JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
                final StreamExecutionEnvironment executionEnvironment2 = StreamExecutionEnvironment.getExecutionEnvironment();
                executionEnvironment2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                executionEnvironment2.getConfig().disableSysoutLogging();
                executionEnvironment2.setParallelism(i2);
                Properties properties = (Properties) standardProps.clone();
                properties.setProperty("group.id", "flink-tests-validator");
                properties.putAll(secureProps);
                executionEnvironment2.addSource(kafkaServer.getConsumer(str2, (KeyedDeserializationSchema) keyedDeserializationSchemaWrapper, properties)).map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.25
                    private final int totalCount;
                    private int count = 0;

                    {
                        this.totalCount = i2 * i;
                    }

                    public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) throws Exception {
                        int i5 = this.count + 1;
                        this.count = i5;
                        if (i5 == this.totalCount) {
                            throw new SuccessException();
                        }
                        return tuple2;
                    }
                }).setParallelism(1).addSink(new DiscardingSink()).setParallelism(1);
                final ?? atomicReference = new AtomicReference();
                Thread thread = new Thread() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.26
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            TestUtils.tryExecute(executionEnvironment2, "sequence validation");
                        } catch (Throwable th) {
                            atomicReference.set(th);
                        }
                    }
                };
                thread.start();
                long nanoTime = System.nanoTime() + 10000000000L;
                while (thread.isAlive() && nanoTime - System.nanoTime() > 0) {
                    thread.join(atomicReference / 1000000);
                }
                if (thread.isAlive()) {
                    z = false;
                    JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
                } else {
                    Throwable th = (Throwable) atomicReference.get();
                    if (th != null) {
                        z = false;
                        LOG.info("Attempt " + i4 + " failed with exception", th);
                    } else {
                        z = true;
                    }
                }
                JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
            } catch (Exception e) {
                LOG.error("Write attempt failed, trying again", e);
                deleteTestTopic(str2);
                JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
            }
            if (z) {
                return str2;
            }
            deleteTestTopic(str2);
        }
        throw new Exception("Could not write a valid sequence to Kafka after 10 attempts");
    }

    private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String str, ConsumerConfig consumerConfig, int i) {
        Map createMessageStreams = Consumer.createJavaConsumerConnector(consumerConfig).createMessageStreams(Collections.singletonMap(str, 1));
        if (createMessageStreams.size() != 1) {
            throw new RuntimeException("Expected only one message stream but got " + createMessageStreams.size());
        }
        List list = (List) createMessageStreams.get(str);
        if (list == null) {
            throw new RuntimeException("Requested stream not available. Available streams: " + createMessageStreams.toString());
        }
        if (list.size() != 1) {
            throw new RuntimeException("Requested 1 stream from Kafka, bot got " + list.size() + " streams");
        }
        LOG.info("Opening Consumer instance for topic '{}' on group '{}'", str, consumerConfig.groupId());
        ConsumerIterator it = ((KafkaStream) list.get(0)).iterator();
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (it.hasNext()) {
            i2++;
            arrayList.add(it.next());
            if (i2 == i) {
                LOG.info("Read " + i2 + " elements");
                return arrayList;
            }
        }
        return arrayList;
    }

    private static void printTopic(String str, ConsumerConfig consumerConfig, DeserializationSchema<?> deserializationSchema, int i) throws IOException {
        List<MessageAndMetadata<byte[], byte[]>> readTopicToList = readTopicToList(str, consumerConfig, i);
        LOG.info("Printing contents of topic {} in consumer grouo {}", str, consumerConfig.groupId());
        for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : readTopicToList) {
            LOG.info("Message: partition: {} offset: {} msg: {}", new Object[]{Integer.valueOf(messageAndMetadata.partition()), Long.valueOf(messageAndMetadata.offset()), deserializationSchema.deserialize((byte[]) messageAndMetadata.message()).toString()});
        }
    }

    private static void printTopic(String str, int i, DeserializationSchema<?> deserializationSchema) throws IOException {
        Properties properties = new Properties(standardProps);
        properties.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
        properties.setProperty("auto.offset.reset", "smallest");
        properties.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
        properties.putAll(secureProps);
        printTopic(str, new ConsumerConfig(properties), deserializationSchema, i);
    }
}
