package org.apache.beam.runners.flink.examples.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.class */
public class KafkaIOExamples {
    private static final String KAFKA_TOPIC = "input";
    private static final String KAFKA_AVRO_TOPIC = "output";
    private static final String KAFKA_BROKER = "localhost:9092";
    private static final String GROUP_ID = "myGroup";
    private static final String ZOOKEEPER = "localhost:2181";

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaAvro.class */
    public static class KafkaAvro {

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaAvro$AvroSerializationDeserializationSchema.class */
        static class AvroSerializationDeserializationSchema<T> implements SerializationSchema<T>, DeserializationSchema<T> {
            private final Class<T> avroType;
            private final AvroCoder<T> coder;
            private transient ByteArrayOutputStream out = new ByteArrayOutputStream();

            AvroSerializationDeserializationSchema(Class<T> cls) {
                this.avroType = cls;
                this.coder = AvroCoder.of(cls);
            }

            public byte[] serialize(T t) {
                if (this.out == null) {
                    this.out = new ByteArrayOutputStream();
                }
                try {
                    this.out.reset();
                    this.coder.encode(t, this.out, Coder.Context.NESTED);
                    return this.out.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("Avro encoding failed.", e);
                }
            }

            public T deserialize(byte[] bArr) throws IOException {
                return (T) this.coder.decode(new ByteArrayInputStream(bArr), Coder.Context.NESTED);
            }

            public boolean isEndOfStream(T t) {
                return false;
            }

            public TypeInformation<T> getProducedType() {
                return TypeExtractor.getForClass(this.avroType);
            }
        }

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaAvro$MyType.class */
        static class MyType implements Serializable {
            String word;
            long count;

            public MyType() {
            }

            MyType(String str, long j) {
                this.word = str;
                this.count = j;
            }

            public String toString() {
                return "MyType{word='" + this.word + "', count=" + this.count + '}';
            }
        }

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaAvro$ReadAvroFromKafka.class */
        public static class ReadAvroFromKafka {
            public static void main(String[] strArr) {
                Pipeline initializePipeline = KafkaIOExamples.initializePipeline(strArr);
                KafkaOptions options = KafkaIOExamples.getOptions(initializePipeline);
                initializePipeline.apply(Read.from(UnboundedFlinkSource.of(new FlinkKafkaConsumer08(options.getKafkaAvroTopic(), new AvroSerializationDeserializationSchema(MyType.class), KafkaIOExamples.getKafkaProps(options))))).setCoder(AvroCoder.of(MyType.class)).apply(ParDo.of(new PrintFn()));
                initializePipeline.run();
            }
        }

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaAvro$WriteAvroToKafka.class */
        public static class WriteAvroToKafka {
            public static void main(String[] strArr) {
                Pipeline initializePipeline = KafkaIOExamples.initializePipeline(strArr);
                KafkaOptions options = KafkaIOExamples.getOptions(initializePipeline);
                initializePipeline.apply(Create.of(new MyType("word", 1L), new MyType[]{new MyType("another", 2L), new MyType("yet another", 3L)})).apply(Write.to(UnboundedFlinkSink.of(new FlinkKafkaProducer08(options.getKafkaAvroTopic(), new AvroSerializationDeserializationSchema(MyType.class), KafkaIOExamples.getKafkaProps(options)))));
                initializePipeline.run();
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaOptions.class */
    public interface KafkaOptions extends FlinkPipelineOptions {
        @Default.String(KafkaIOExamples.KAFKA_TOPIC)
        @Description("The Kafka topic to read from")
        String getKafkaTopic();

        void setKafkaTopic(String str);

        void setKafkaAvroTopic(String str);

        @Default.String(KafkaIOExamples.KAFKA_AVRO_TOPIC)
        @Description("The Kafka topic to read from")
        String getKafkaAvroTopic();

        @Default.String(KafkaIOExamples.KAFKA_BROKER)
        @Description("The Kafka Broker to read from")
        String getBroker();

        void setBroker(String str);

        @Default.String(KafkaIOExamples.ZOOKEEPER)
        @Description("The Zookeeper server to connect to")
        String getZookeeper();

        void setZookeeper(String str);

        @Default.String(KafkaIOExamples.GROUP_ID)
        @Description("The groupId")
        String getGroup();

        void setGroup(String str);
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaString.class */
    public static class KafkaString {

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaString$ReadStringFromKafka.class */
        public static class ReadStringFromKafka {
            public static void main(String[] strArr) {
                Pipeline initializePipeline = KafkaIOExamples.initializePipeline(strArr);
                KafkaOptions options = KafkaIOExamples.getOptions(initializePipeline);
                initializePipeline.apply(Read.from(UnboundedFlinkSource.of(new FlinkKafkaConsumer08(options.getKafkaTopic(), new SimpleStringSchema(), KafkaIOExamples.getKafkaProps(options))))).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new PrintFn()));
                initializePipeline.run();
            }
        }

        /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$KafkaString$WriteStringToKafka.class */
        public static class WriteStringToKafka {
            public static void main(String[] strArr) {
                Pipeline initializePipeline = KafkaIOExamples.initializePipeline(strArr);
                KafkaOptions options = KafkaIOExamples.getOptions(initializePipeline);
                initializePipeline.apply(Create.of("These", new String[]{"are", "some", "words"})).apply(Write.to(UnboundedFlinkSink.of(new FlinkKafkaProducer08(options.getKafkaTopic(), new SimpleStringSchema(), KafkaIOExamples.getKafkaProps(options)))));
                initializePipeline.run();
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples$PrintFn.class */
    private static class PrintFn<T> extends DoFn<T, T> {
        private PrintFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
            System.out.println(processContext.element().toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Pipeline initializePipeline(String[] strArr) {
        KafkaOptions as = PipelineOptionsFactory.fromArgs(strArr).as(KafkaOptions.class);
        as.setStreaming(true);
        as.setRunner(FlinkRunner.class);
        as.setCheckpointingInterval(1000L);
        as.setNumberOfExecutionRetries(5);
        as.setExecutionRetryDelay(3000L);
        return Pipeline.create(as);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static KafkaOptions getOptions(Pipeline pipeline) {
        return pipeline.getOptions().as(KafkaOptions.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Properties getKafkaProps(KafkaOptions kafkaOptions) {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
        properties.setProperty("bootstrap.servers", kafkaOptions.getBroker());
        properties.setProperty("group.id", kafkaOptions.getGroup());
        return properties;
    }
}
