/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaShortRetentionTestBase
implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
    protected static final int NUM_TMS = 1;
    protected static final int TM_SLOTS = 8;
    protected static final int PARALLELISM = 8;
    private static KafkaTestEnvironment kafkaServer;
    private static Properties standardProps;
    private static LocalFlinkMiniCluster flink;
    @ClassRule
    public static TemporaryFolder tempFolder;
    protected static Properties secureProps;
    private static boolean stopProducer;

    @BeforeClass
    public static void prepare() throws IOException, ClassNotFoundException {
        LOG.info("-------------------------------------------------------------------------");
        LOG.info("    Starting KafkaShortRetentionTestBase ");
        LOG.info("-------------------------------------------------------------------------");
        Configuration flinkConfig = new Configuration();
        Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
        kafkaServer = (KafkaTestEnvironment)InstantiationUtil.instantiate(clazz);
        LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
        if (kafkaServer.isSecureRunSupported()) {
            secureProps = kafkaServer.getSecureProperties();
        }
        Properties specificProperties = new Properties();
        specificProperties.setProperty("log.retention.hours", "0");
        specificProperties.setProperty("log.retention.minutes", "0");
        specificProperties.setProperty("log.retention.ms", "250");
        specificProperties.setProperty("log.retention.check.interval.ms", "100");
        kafkaServer.prepare(1, specificProperties, false);
        standardProps = kafkaServer.getStandardProperties();
        flinkConfig.setInteger("local.number-taskmanager", 1);
        flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
        flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
        flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
        flink = new LocalFlinkMiniCluster(flinkConfig, false);
        flink.start();
        TestStreamEnvironment.setAsContext((LocalFlinkMiniCluster)flink, (int)8);
    }

    @AfterClass
    public static void shutDownServices() {
        TestStreamEnvironment.unsetAsContext();
        if (flink != null) {
            flink.shutdown();
        }
        kafkaServer.shutdown();
        secureProps.clear();
    }

    public void runAutoOffsetResetTest() throws Exception {
        String topic = "auto-offset-reset-test";
        boolean parallelism = true;
        int elementsPerPartition = 50000;
        Properties tprops = new Properties();
        tprops.setProperty("retention.ms", "250");
        kafkaServer.createTestTopic("auto-offset-reset-test", 1, 1, tprops);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getConfig().disableSysoutLogging();
        DataStreamSource stream = env.addSource((SourceFunction)new RichParallelSourceFunction<String>(){
            private boolean running = true;

            public void run(SourceFunction.SourceContext<String> ctx) throws InterruptedException {
                int cnt;
                int limit = cnt + 50000;
                for (cnt = this.getRuntimeContext().getIndexOfThisSubtask() * 50000; this.running && !stopProducer && cnt < limit; ++cnt) {
                    ctx.collect((Object)("element-" + cnt));
                    Thread.sleep(10L);
                }
                LOG.info("Stopping producer");
            }

            public void cancel() {
                this.running = false;
            }
        });
        Properties props = new Properties();
        props.putAll((Map<?, ?>)standardProps);
        props.putAll((Map<?, ?>)secureProps);
        kafkaServer.produceIntoKafka(stream, "auto-offset-reset-test", new KeyedSerializationSchemaWrapper((SerializationSchema)new SimpleStringSchema()), props, null);
        NonContinousOffsetsDeserializationSchema deserSchema = new NonContinousOffsetsDeserializationSchema();
        FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("auto-offset-reset-test", deserSchema, props);
        DataStreamSource consuming = env.addSource(source);
        consuming.addSink((SinkFunction)new DiscardingSink());
        TestUtils.tryExecute((StreamExecutionEnvironment)env, (String)"run auto offset reset test");
        kafkaServer.deleteTestTopic("auto-offset-reset-test");
    }

    public void runFailOnAutoOffsetResetNone() throws Exception {
        block2: {
            String topic = "auto-offset-reset-none-test";
            boolean parallelism = true;
            kafkaServer.createTestTopic("auto-offset-reset-none-test", 1, 1);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)flink.getLeaderRPCPort(), (String[])new String[0]);
            env.setParallelism(1);
            env.setRestartStrategy(RestartStrategies.noRestart());
            env.getConfig().disableSysoutLogging();
            Properties customProps = new Properties();
            customProps.putAll((Map<?, ?>)standardProps);
            customProps.putAll((Map<?, ?>)secureProps);
            customProps.setProperty("auto.offset.reset", "none");
            FlinkKafkaConsumerBase source = kafkaServer.getConsumer("auto-offset-reset-none-test", new SimpleStringSchema(), customProps);
            DataStreamSource consuming = env.addSource(source);
            consuming.addSink((SinkFunction)new DiscardingSink());
            try {
                env.execute("Test auto offset reset none");
            }
            catch (Throwable e) {
                if (e.getCause().getCause().getMessage().contains("Unable to find previous offset") || e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition")) break block2;
                throw e;
            }
        }
        kafkaServer.deleteTestTopic("auto-offset-reset-none-test");
    }

    public void runFailOnAutoOffsetResetNoneEager() throws Exception {
        String topic = "auto-offset-reset-none-test";
        boolean parallelism = true;
        kafkaServer.createTestTopic("auto-offset-reset-none-test", 1, 1);
        Properties customProps = new Properties();
        customProps.putAll((Map<?, ?>)standardProps);
        customProps.putAll((Map<?, ?>)secureProps);
        customProps.setProperty("auto.offset.reset", "none");
        try {
            kafkaServer.getConsumer("auto-offset-reset-none-test", new SimpleStringSchema(), customProps);
            Assert.fail((String)"should fail with an exception");
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("none"));
        }
        kafkaServer.deleteTestTopic("auto-offset-reset-none-test");
    }

    static {
        tempFolder = new TemporaryFolder();
        secureProps = new Properties();
        stopProducer = false;
    }

    private class NonContinousOffsetsDeserializationSchema
    implements KeyedDeserializationSchema<String> {
        private int numJumps;
        long nextExpected = 0L;

        private NonContinousOffsetsDeserializationSchema() {
        }

        public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
            if (offset != this.nextExpected) {
                ++this.numJumps;
                this.nextExpected = offset;
                LOG.info("Registered now jump at offset {}", (Object)offset);
            }
            ++this.nextExpected;
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Stopping it");
            }
            return "";
        }

        public boolean isEndOfStream(String nextElement) {
            if (this.numJumps >= 5) {
                stopProducer = true;
                return true;
            }
            return false;
        }

        public TypeInformation<String> getProducedType() {
            return TypeInfoParser.parse((String)"String");
        }
    }
}

