package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTopology.class */
public class KafkaTopology {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
    private static final int SOURCE_PARALELISM = 1;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTopology$MyKafkaPrintSink.class */
    public static final class MyKafkaPrintSink implements SinkFunction<String> {
        private static final long serialVersionUID = 1;

        public void invoke(String str) {
            if (KafkaTopology.LOG.isInfoEnabled()) {
                KafkaTopology.LOG.info("String: <{}> arrived from Kafka", str);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTopology$MyKafkaSink.class */
    public static final class MyKafkaSink extends KafkaSink<String, String> {
        private static final long serialVersionUID = 1;

        public MyKafkaSink(String str, String str2) {
            super(str, str2);
        }

        @Override // org.apache.flink.streaming.connectors.kafka.KafkaSink
        public String serialize(String str) {
            if (str.equals("q")) {
                sendAndClose();
            }
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTopology$MyKafkaSource.class */
    public static final class MyKafkaSource extends KafkaSource<String> {
        private static final long serialVersionUID = 1;

        public MyKafkaSource(String str, String str2, String str3, int i) {
            super(str, str2, str3, i);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.streaming.connectors.kafka.KafkaSource
        public String deserialize(byte[] bArr) {
            String str = new String(bArr);
            if (str.equals("q")) {
                closeWithoutSend();
            }
            return new String(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTopology$MySource.class */
    public static final class MySource implements SourceFunction<String> {
        private static final long serialVersionUID = 1;

        public void invoke(Collector<String> collector) throws Exception {
            for (int i = 0; i < 10; i += KafkaTopology.SOURCE_PARALELISM) {
                collector.collect(new String(Integer.toString(i)));
            }
            collector.collect(new String("q"));
        }
    }

    public static void main(String[] strArr) throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(SOURCE_PARALELISM);
        createLocalEnvironment.addSource(new MyKafkaSource("localhost:2181", "group", "test", SOURCE_PARALELISM), SOURCE_PARALELISM).addSink(new MyKafkaPrintSink());
        createLocalEnvironment.addSource(new MySource()).addSink(new MyKafkaSink("test", "localhost:9092"));
        createLocalEnvironment.execute();
    }
}
