/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

public abstract class KafkaProducerTestBase
extends KafkaTestBase {
    public void runCustomPartitioningTest() {
        try {
            LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
            String defaultTopic = "defaultTopic";
            int defaultTopicPartitions = 2;
            String dynamicTopic = "dynamicTopic";
            int dynamicTopicPartitions = 3;
            KafkaProducerTestBase.createTestTopic("defaultTopic", 2, 1);
            KafkaProducerTestBase.createTestTopic("dynamicTopic", 3, 1);
            HashMap<String, Integer> expectedTopicsToNumPartitions = new HashMap<String, Integer>(2);
            expectedTopicsToNumPartitions.put("defaultTopic", 2);
            expectedTopicsToNumPartitions.put("dynamicTopic", 3);
            TypeInformation longStringInfo = TypeInfoParser.parse((String)"Tuple2<Long, String>");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRestartStrategy(RestartStrategies.noRestart());
            env.getConfig().disableSysoutLogging();
            TypeInformationSerializationSchema serSchema = new TypeInformationSerializationSchema(longStringInfo, env.getConfig());
            TypeInformationSerializationSchema deserSchema = new TypeInformationSerializationSchema(longStringInfo, env.getConfig());
            DataStreamSource stream = env.addSource((SourceFunction)new SourceFunction<Tuple2<Long, String>>(){
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Long, String>> ctx) throws Exception {
                    long cnt = 0L;
                    while (this.running) {
                        ctx.collect((Object)new Tuple2((Object)cnt, (Object)("kafka-" + cnt)));
                        ++cnt;
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(1);
            Properties props = new Properties();
            props.putAll((Map<?, ?>)FlinkKafkaProducerBase.getPropertiesFromBrokerList((String)brokerConnectionStrings));
            props.putAll((Map<?, ?>)secureProps);
            kafkaServer.produceIntoKafka(stream, "defaultTopic", new CustomKeyedSerializationSchemaWrapper((SerializationSchema<Tuple2<Long, String>>)serSchema, "defaultTopic", "dynamicTopic"), props, new CustomPartitioner(expectedTopicsToNumPartitions)).setParallelism(Math.max(2, 3));
            Properties consumerProps = new Properties();
            consumerProps.putAll((Map<?, ?>)standardProps);
            consumerProps.putAll((Map<?, ?>)secureProps);
            FlinkKafkaConsumerBase defaultTopicSource = kafkaServer.getConsumer("defaultTopic", deserSchema, consumerProps);
            FlinkKafkaConsumerBase dynamicTopicSource = kafkaServer.getConsumer("dynamicTopic", deserSchema, consumerProps);
            env.addSource(defaultTopicSource).setParallelism(2).map((MapFunction)new PartitionValidatingMapper(2)).setParallelism(2).addSink((SinkFunction)new PartitionValidatingSink(2)).setParallelism(1);
            env.addSource(dynamicTopicSource).setParallelism(3).map((MapFunction)new PartitionValidatingMapper(3)).setParallelism(3).addSink((SinkFunction)new PartitionValidatingSink(3)).setParallelism(1);
            TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"custom partitioning test");
            KafkaProducerTestBase.deleteTestTopic("defaultTopic");
            KafkaProducerTestBase.deleteTestTopic("dynamicTopic");
            LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class PartitionValidatingSink
    implements SinkFunction<Integer> {
        private final int[] valuesPerPartition;

        public PartitionValidatingSink(int numPartitions) {
            this.valuesPerPartition = new int[numPartitions];
        }

        public void invoke(Integer value) throws Exception {
            int n = value;
            this.valuesPerPartition[n] = this.valuesPerPartition[n] + 1;
            boolean missing = false;
            for (int i : this.valuesPerPartition) {
                if (i >= 100) continue;
                missing = true;
                break;
            }
            if (!missing) {
                throw new SuccessException();
            }
        }
    }

    public static class PartitionValidatingMapper
    extends RichMapFunction<Tuple2<Long, String>, Integer> {
        private final int numPartitions;
        private int ourPartition = -1;

        public PartitionValidatingMapper(int numPartitions) {
            this.numPartitions = numPartitions;
        }

        public Integer map(Tuple2<Long, String> value) throws Exception {
            int partition = ((Long)value.f0).intValue() % this.numPartitions;
            if (this.ourPartition != -1) {
                Assert.assertEquals((String)"inconsistent partitioning", (long)this.ourPartition, (long)partition);
            } else {
                this.ourPartition = partition;
            }
            return partition;
        }
    }

    public static class CustomKeyedSerializationSchemaWrapper
    extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
        private final String defaultTopic;
        private final String dynamicTopic;

        public CustomKeyedSerializationSchemaWrapper(SerializationSchema<Tuple2<Long, String>> serializationSchema, String defaultTopic, String dynamicTopic) {
            super(serializationSchema);
            this.defaultTopic = (String)Preconditions.checkNotNull((Object)defaultTopic);
            this.dynamicTopic = (String)Preconditions.checkNotNull((Object)dynamicTopic);
        }

        public String getTargetTopic(Tuple2<Long, String> element) {
            return (Long)element.f0 % 2L == 0L ? this.defaultTopic : this.dynamicTopic;
        }
    }

    public static class CustomPartitioner
    extends FlinkKafkaPartitioner<Tuple2<Long, String>>
    implements Serializable {
        private final Map<String, Integer> expectedTopicsToNumPartitions;

        public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
            this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
        }

        public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
            Assert.assertEquals((long)this.expectedTopicsToNumPartitions.get(topic).intValue(), (long)partitions.length);
            return (int)((Long)next.f0 % (long)partitions.length);
        }
    }
}

