package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.class */
public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
    private static final long KAFKA_READ_TIMEOUT = 60000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$BrokerRestartingMapper.class */
    public static class BrokerRestartingMapper<T> extends RichMapFunction<T, T> implements CheckpointedFunction, CheckpointListener {
        private static final long serialVersionUID = 6334389850158707313L;
        public static volatile boolean triggeredShutdown;
        public static volatile int lastSnapshotedElementBeforeShutdown;
        public static volatile Runnable shutdownAction;
        private final int failCount;
        private int numElementsTotal;
        private boolean failer;

        public static void resetState(Runnable runnable) {
            triggeredShutdown = false;
            lastSnapshotedElementBeforeShutdown = 0;
            shutdownAction = runnable;
        }

        public BrokerRestartingMapper(int i) {
            this.failCount = i;
        }

        public void open(Configuration configuration) {
            this.failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
        }

        public T map(T t) throws Exception {
            this.numElementsTotal++;
            Thread.sleep(10L);
            if (!triggeredShutdown && this.failer && this.numElementsTotal >= this.failCount) {
                triggeredShutdown = true;
                shutdownAction.run();
            }
            return t;
        }

        public void notifyCheckpointComplete(long j) {
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            if (triggeredShutdown) {
                return;
            }
            lastSnapshotedElementBeforeShutdown = this.numElementsTotal;
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$CustomKeyedSerializationSchemaWrapper.class */
    public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
        private final String defaultTopic;
        private final String dynamicTopic;

        public CustomKeyedSerializationSchemaWrapper(SerializationSchema<Tuple2<Long, String>> serializationSchema, String str, String str2) {
            super(serializationSchema);
            this.defaultTopic = (String) Preconditions.checkNotNull(str);
            this.dynamicTopic = (String) Preconditions.checkNotNull(str2);
        }

        public String getTargetTopic(Tuple2<Long, String> tuple2) {
            return ((Long) tuple2.f0).longValue() % 2 == 0 ? this.defaultTopic : this.dynamicTopic;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$CustomPartitioner.class */
    private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
        private final Map<String, Integer> expectedTopicsToNumPartitions;

        public CustomPartitioner(Map<String, Integer> map) {
            this.expectedTopicsToNumPartitions = map;
        }

        public int partition(Tuple2<Long, String> tuple2, byte[] bArr, byte[] bArr2, String str, int[] iArr) {
            Assert.assertEquals(this.expectedTopicsToNumPartitions.get(str).intValue(), iArr.length);
            return (int) (((Long) tuple2.f0).longValue() % iArr.length);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$PartitionValidatingMapper.class */
    public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
        private final int numPartitions;
        private int ourPartition = -1;

        public PartitionValidatingMapper(int i) {
            this.numPartitions = i;
        }

        public Integer map(Tuple2<Long, String> tuple2) throws Exception {
            int intValue = ((Long) tuple2.f0).intValue() % this.numPartitions;
            if (this.ourPartition != -1) {
                Assert.assertEquals("inconsistent partitioning", this.ourPartition, intValue);
            } else {
                this.ourPartition = intValue;
            }
            return Integer.valueOf(intValue);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$PartitionValidatingSink.class */
    public static class PartitionValidatingSink implements SinkFunction<Integer> {
        private final int[] valuesPerPartition;

        public PartitionValidatingSink(int i) {
            this.valuesPerPartition = new int[i];
        }

        public void invoke(Integer num) throws Exception {
            int[] iArr = this.valuesPerPartition;
            int intValue = num.intValue();
            iArr[intValue] = iArr[intValue] + 1;
            boolean z = false;
            int[] iArr2 = this.valuesPerPartition;
            int length = iArr2.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                if (iArr2[i] < 100) {
                    z = true;
                    break;
                }
                i++;
            }
            if (!z) {
                throw new SuccessException();
            }
        }
    }

    @Test
    public void testCustomPartitioning() {
        try {
            LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
            createTestTopic("defaultTopic", 2, 1);
            createTestTopic("dynamicTopic", 3, 1);
            HashMap hashMap = new HashMap(2);
            hashMap.put("defaultTopic", 2);
            hashMap.put("dynamicTopic", 3);
            TypeInformation of = TypeInformation.of(new TypeHint<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.1
            });
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            executionEnvironment.getConfig().disableSysoutLogging();
            TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(of, executionEnvironment.getConfig());
            TypeInformationSerializationSchema typeInformationSerializationSchema2 = new TypeInformationSerializationSchema(of, executionEnvironment.getConfig());
            DataStreamSource parallelism = executionEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.2
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Long, String>> sourceContext) throws Exception {
                    long j = 0;
                    while (this.running) {
                        sourceContext.collect(new Tuple2(Long.valueOf(j), "kafka-" + j));
                        j++;
                        if (j % 100 == 0) {
                            Thread.sleep(1L);
                        }
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(1);
            Properties properties = new Properties();
            properties.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
            properties.putAll(secureProps);
            kafkaServer.produceIntoKafka(parallelism, "defaultTopic", new CustomKeyedSerializationSchemaWrapper(typeInformationSerializationSchema, "defaultTopic", "dynamicTopic"), properties, new CustomPartitioner(hashMap)).setParallelism(Math.max(2, 3));
            Properties properties2 = new Properties();
            properties2.putAll(standardProps);
            properties2.putAll(secureProps);
            FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer("defaultTopic", (DeserializationSchema) typeInformationSerializationSchema2, properties2);
            FlinkKafkaConsumerBase consumer2 = kafkaServer.getConsumer("dynamicTopic", (DeserializationSchema) typeInformationSerializationSchema2, properties2);
            executionEnvironment.addSource(consumer).setParallelism(2).map(new PartitionValidatingMapper(2)).setParallelism(2).addSink(new PartitionValidatingSink(2)).setParallelism(1);
            executionEnvironment.addSource(consumer2).setParallelism(3).map(new PartitionValidatingMapper(3)).setParallelism(3).addSink(new PartitionValidatingSink(3)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "custom partitioning test");
            deleteTestTopic("defaultTopic");
            deleteTestTopic("dynamicTopic");
            LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testOneToOneAtLeastOnceRegularSink() throws Exception {
        testOneToOneAtLeastOnce(true);
    }

    @Test
    public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
        testOneToOneAtLeastOnce(false);
    }

    protected void testOneToOneAtLeastOnce(boolean z) throws Exception {
        String str = z ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
        createTestTopic(str, 1, 1);
        KeyedSerializationSchemaWrapper keyedSerializationSchemaWrapper = new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        properties.setProperty("timeout.ms", "10000");
        properties.setProperty("max.block.ms", "10000");
        properties.setProperty("batch.size", "10240000");
        properties.setProperty("linger.ms", "10000");
        KafkaTestEnvironment kafkaTestEnvironment = kafkaServer;
        kafkaTestEnvironment.getClass();
        BrokerRestartingMapper.resetState(kafkaTestEnvironment::blockProxyTraffic);
        SingleOutputStreamOperator map = executionEnvironment.fromCollection(getIntegersSequence(1000)).map(new BrokerRestartingMapper(333));
        StreamSink producerSink = kafkaServer.getProducerSink(str, keyedSerializationSchemaWrapper, properties, new FlinkKafkaPartitioner<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.3
            public int partition(Integer num, byte[] bArr, byte[] bArr2, String str2, int[] iArr) {
                return 0;
            }
        });
        if (z) {
            map.addSink(producerSink.getUserFunction());
        } else {
            kafkaServer.produceIntoKafka(map, str, keyedSerializationSchemaWrapper, properties, new FlinkKafkaPartitioner<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.4
                public int partition(Integer num, byte[] bArr, byte[] bArr2, String str2, int[] iArr) {
                    return 0;
                }
            });
        }
        FailingIdentityMapper.failedBefore = false;
        try {
            executionEnvironment.execute("One-to-one at least once test");
            Assert.fail("Job should fail!");
        } catch (JobExecutionException e) {
        }
        kafkaServer.unblockProxyTraffic();
        assertAtLeastOnceForTopic(properties, str, 0, Collections.unmodifiableSet(new HashSet(getIntegersSequence(BrokerRestartingMapper.lastSnapshotedElementBeforeShutdown))), KAFKA_READ_TIMEOUT);
        deleteTestTopic(str);
    }

    @Test
    public void testExactlyOnceRegularSink() throws Exception {
        testExactlyOnce(true, 1);
    }

    @Test
    public void testExactlyOnceCustomOperator() throws Exception {
        testExactlyOnce(false, 1);
    }

    protected void testExactlyOnce(boolean z, int i) throws Exception {
        String str = (z ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + i;
        for (int i2 = 0; i2 < i; i2++) {
            createTestTopic(str + i2, 1, 1);
        }
        KeyedSerializationSchemaWrapper keyedSerializationSchemaWrapper = new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(1);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        executionEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.putAll(secureProps);
        List<Integer> integersSequence = getIntegersSequence(1000);
        SingleOutputStreamOperator map = executionEnvironment.addSource(new IntegerSource(1000)).map(new FailingIdentityMapper(333));
        for (int i3 = 0; i3 < i; i3++) {
            FlinkKafkaPartitioner<Integer> flinkKafkaPartitioner = new FlinkKafkaPartitioner<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.5
                public int partition(Integer num, byte[] bArr, byte[] bArr2, String str2, int[] iArr) {
                    return 0;
                }
            };
            if (z) {
                map.addSink(kafkaServer.getProducerSink(str + i3, keyedSerializationSchemaWrapper, properties, flinkKafkaPartitioner).getUserFunction());
            } else {
                kafkaServer.produceIntoKafka(map, str + i3, keyedSerializationSchemaWrapper, properties, flinkKafkaPartitioner);
            }
        }
        FailingIdentityMapper.failedBefore = false;
        TestUtils.tryExecute(executionEnvironment, "Exactly once test");
        for (int i4 = 0; i4 < i; i4++) {
            assertExactlyOnceForTopic(properties, str + i4, 0, integersSequence, KAFKA_READ_TIMEOUT);
            deleteTestTopic(str + i4);
        }
    }

    private List<Integer> getIntegersSequence(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(Integer.valueOf(i2));
        }
        return arrayList;
    }
}
