package org.apache.flink.streaming.connectors.kafka;

import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.class */
public class KafkaShortRetentionTestBase implements Serializable {
    private static KafkaTestEnvironment kafkaServer;
    private static Properties standardProps;
    private static ForkableFlinkMiniCluster flink;
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
    private static boolean stopProducer = false;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase$NonContinousOffsetsDeserializationSchema.class */
    private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
        private int numJumps;
        long nextExpected;

        private NonContinousOffsetsDeserializationSchema() {
            this.nextExpected = 0L;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m4deserialize(byte[] bArr, byte[] bArr2, String str, int i, long j) throws IOException {
            if (j != this.nextExpected) {
                this.numJumps++;
                this.nextExpected = j;
                KafkaShortRetentionTestBase.LOG.info("Registered now jump at offset {}", Long.valueOf(j));
            }
            this.nextExpected++;
            try {
                Thread.sleep(10L);
                return "";
            } catch (InterruptedException e) {
                throw new RuntimeException("Stopping it");
            }
        }

        public boolean isEndOfStream(String str) {
            if (this.numJumps < 5) {
                return false;
            }
            boolean unused = KafkaShortRetentionTestBase.stopProducer = true;
            return true;
        }

        public TypeInformation<String> getProducedType() {
            return TypeInfoParser.parse("String");
        }
    }

    @BeforeClass
    public static void prepare() throws IOException, ClassNotFoundException {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting KafkaShortRetentionTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"));
        LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
        Properties properties = new Properties();
        properties.setProperty("log.retention.hours", "0");
        properties.setProperty("log.retention.minutes", "0");
        properties.setProperty("log.retention.ms", "250");
        properties.setProperty("log.retention.check.interval.ms", "100");
        kafkaServer.prepare(1, properties);
        standardProps = kafkaServer.getStandardProperties();
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 8);
        configuration.setInteger("taskmanager.memory.size", 16);
        configuration.setString("restart-strategy.fixed-delay.delay", "0 s");
        flink = new ForkableFlinkMiniCluster(configuration, false);
        flink.start();
    }

    @AfterClass
    public static void shutDownServices() {
        if (flink != null) {
            flink.shutdown();
        }
        kafkaServer.shutdown();
    }

    public void runAutoOffsetResetTest() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("retention.ms", "250");
        kafkaServer.createTestTopic("auto-offset-reset-test", 1, 1, properties);
        StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort(), new String[0]);
        createRemoteEnvironment.setParallelism(1);
        createRemoteEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        createRemoteEnvironment.addSource(new RichParallelSourceFunction<String>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaShortRetentionTestBase.1
            private boolean running = true;

            public void run(SourceFunction.SourceContext<String> sourceContext) throws InterruptedException {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask() * 50000;
                int i = indexOfThisSubtask + 50000;
                while (this.running && !KafkaShortRetentionTestBase.stopProducer && indexOfThisSubtask < i) {
                    sourceContext.collect("element-" + indexOfThisSubtask);
                    indexOfThisSubtask++;
                    Thread.sleep(10L);
                }
                KafkaShortRetentionTestBase.LOG.info("Stopping producer");
            }

            public void cancel() {
                this.running = false;
            }
        }).addSink(kafkaServer.getProducer("auto-offset-reset-test", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), standardProps, null));
        createRemoteEnvironment.addSource(kafkaServer.getConsumer("auto-offset-reset-test", new NonContinousOffsetsDeserializationSchema(), standardProps)).addSink(new DiscardingSink());
        TestUtils.tryExecute(createRemoteEnvironment, "run auto offset reset test");
        kafkaServer.deleteTestTopic("auto-offset-reset-test");
    }

    public void runFailOnAutoOffsetResetNone() throws Exception {
        boolean contains;
        boolean contains2;
        kafkaServer.createTestTopic("auto-offset-reset-none-test", 1, 1);
        StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort(), new String[0]);
        createRemoteEnvironment.setParallelism(1);
        createRemoteEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        Properties properties = new Properties();
        properties.putAll(standardProps);
        properties.setProperty("auto.offset.reset", "none");
        createRemoteEnvironment.addSource(kafkaServer.getConsumer("auto-offset-reset-none-test", (DeserializationSchema) new SimpleStringSchema(), properties)).addSink(new DiscardingSink());
        try {
            createRemoteEnvironment.execute("Test auto offset reset none");
        } finally {
            if (!contains) {
                if (!contains2) {
                }
            }
            kafkaServer.deleteTestTopic("auto-offset-reset-none-test");
        }
        kafkaServer.deleteTestTopic("auto-offset-reset-none-test");
    }
}
