package org.apache.flink.streaming.connectors.flume;

import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeTopology.class */
public class FlumeTopology {
    private static final Logger LOG = LoggerFactory.getLogger(FlumeTopology.class);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeTopology$MyFlumePrintSink.class */
    public static final class MyFlumePrintSink implements SinkFunction<String> {
        private static final long serialVersionUID = 1;

        public void invoke(String str) {
            if (FlumeTopology.LOG.isInfoEnabled()) {
                FlumeTopology.LOG.info("String: <{}> arrived from Flume", str);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeTopology$MyFlumeSink.class */
    public static class MyFlumeSink extends FlumeSink<String> {
        private static final long serialVersionUID = 1;

        public MyFlumeSink(String str, int i) {
            super(str, i);
        }

        @Override // org.apache.flink.streaming.connectors.flume.FlumeSink
        public byte[] serialize(String str) {
            if (str.equals("q")) {
                try {
                    sendAndClose();
                } catch (Exception e) {
                    throw new RuntimeException("Error while closing Flume connection with " + this.port + " at " + this.host, e);
                }
            }
            return SerializationUtils.serialize(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeTopology$MyFlumeSource.class */
    public static class MyFlumeSource extends FlumeSource<String> {
        private static final long serialVersionUID = 1;

        MyFlumeSource(String str, int i) {
            super(str, i);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.streaming.connectors.flume.FlumeSource
        public String deserialize(byte[] bArr) {
            String str = (String) SerializationUtils.deserialize(bArr);
            if (str.equals("q")) {
                closeWithoutSend();
            }
            return str;
        }
    }

    public static void main(String[] strArr) throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.addSource(new MyFlumeSource("localhost", 41414)).addSink(new MyFlumePrintSink());
        createLocalEnvironment.fromElements(new String[]{"one", "two", "three", "four", "five", "q"}).addSink(new MyFlumeSink("localhost", 42424));
        createLocalEnvironment.execute();
    }
}
