/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.flume;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlumeSink<IN>
extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
    private transient FlinkRpcClientFacade client;
    boolean initDone = false;
    String host;
    int port;
    SerializationSchema<IN, byte[]> schema;

    public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
        this.host = host;
        this.port = port;
        this.schema = schema;
    }

    public void invoke(IN value) {
        byte[] data = (byte[])this.schema.serialize(value);
        this.client.sendDataToFlume(data);
    }

    public void close() {
        this.client.client.close();
    }

    public void open(Configuration config) {
        this.client = new FlinkRpcClientFacade();
        this.client.init(this.host, this.port);
    }

    private class FlinkRpcClientFacade {
        private RpcClient client;
        private String hostname;
        private int port;

        private FlinkRpcClientFacade() {
        }

        public void init(String hostname, int port) {
            this.hostname = hostname;
            this.port = port;
            int initCounter = 0;
            while (true) {
                block6: {
                    if (initCounter >= 90) {
                        throw new RuntimeException("Cannot establish connection with" + port + " at " + FlumeSink.this.host);
                    }
                    try {
                        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
                    }
                    catch (FlumeException e) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException e1) {
                            if (!LOG.isErrorEnabled()) break block6;
                            LOG.error("Interrupted while trying to connect {} at {}", (Object)port, (Object)FlumeSink.this.host);
                        }
                    }
                }
                if (this.client != null) break;
                ++initCounter;
            }
            FlumeSink.this.initDone = true;
        }

        public void sendDataToFlume(byte[] data) {
            Event event = EventBuilder.withBody(data);
            try {
                this.client.append(event);
            }
            catch (EventDeliveryException e) {
                this.client.close();
                this.client = null;
                this.client = RpcClientFactory.getDefaultInstance(this.hostname, this.port);
            }
        }
    }
}

