package co.cask.cdap.flume;

import co.cask.cdap.client.StreamClient;
import co.cask.cdap.client.StreamWriter;
import co.cask.cdap.client.rest.RestStreamClient;
import co.cask.cdap.security.authentication.client.AuthenticationClient;
import co.cask.cdap.security.authentication.client.basic.BasicAuthenticationClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.SyslogSourceConfigurationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/flume/StreamSink.class */
public class StreamSink implements Sink, LifecycleAware, Configurable {
    private static final int DEFAULT_WRITER_POOL_SIZE = 1;
    private static final boolean DEFAULT_SSL = false;
    private static final boolean DEFAULT_VERIFY_SSL_CERT = true;
    private static final String DEFAULT_VERSION = "v2";
    private static final int DEFAULT_PORT = 10000;
    private String host;
    private Integer port;
    private boolean sslEnabled;
    private boolean verifySSLCert;
    private int writerPoolSize;
    private String version;
    private String streamName;
    private StreamWriter writer;
    private StreamClient streamClient;
    private String authClientClassName;
    private String authClientPropertiesPath;
    private Channel channel;
    private String name;
    private LifecycleState lifecycleState;
    AuthenticationClient authClient;
    private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
    private static final String DEFAULT_AUTH_CLIENT = BasicAuthenticationClient.class.getName();

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
        this.port = context.getInteger("port", 10000);
        this.sslEnabled = context.getBoolean("sslEnabled", false).booleanValue();
        this.verifySSLCert = context.getBoolean("verifySSLCert", true).booleanValue();
        this.version = context.getString("version", "v2");
        this.writerPoolSize = context.getInteger("writerPoolSize", 1).intValue();
        this.streamName = context.getString("streamName");
        this.authClientClassName = context.getString("authClientClass", DEFAULT_AUTH_CLIENT);
        this.authClientPropertiesPath = context.getString("authClientProperties", "");
        Preconditions.checkState(this.host != null, "No hostname specified");
        Preconditions.checkState(this.streamName != null, "No stream name specified");
    }

    @Override // org.apache.flume.Sink
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override // org.apache.flume.Sink
    public Channel getChannel() {
        return this.channel;
    }

    @Override // org.apache.flume.Sink
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            try {
                tryReopenClientConnection();
                transaction.begin();
                Event take = channel.take();
                if (take != null) {
                    try {
                        this.writer.write(ByteBuffer.wrap(take.getBody()), take.getHeaders()).get();
                        LOG.trace("Success write to stream: {} ", this.streamName);
                    } catch (Throwable th) {
                        th = th;
                        if (th instanceof ExecutionException) {
                            th = th.getCause();
                        }
                        LOG.error("Error during writing event to stream {}", this.streamName, th);
                        throw new EventDeliveryException("Failed to send events to stream: " + this.streamName, th);
                    }
                }
                transaction.commit();
                transaction.close();
            } catch (Throwable th2) {
                transaction.close();
                throw th2;
            }
        } catch (Throwable th3) {
            transaction.rollback();
            if (th3 instanceof Error) {
                throw ((Error) th3);
            }
            if (!(th3 instanceof ChannelException)) {
                LOG.debug("Closing writer due to stream error ", th3);
                closeClientQuietly();
                closeWriterQuietly();
                throw new EventDeliveryException("Sink event sending error", th3);
            }
            LOG.error("Stream Sink {}: Unable to get event from channel {} ", getName(), channel.getName());
            status = Sink.Status.BACKOFF;
            transaction.close();
        }
        return status;
    }

    private void tryReopenClientConnection() throws IOException {
        if (this.writer == null) {
            LOG.debug("Trying to reopen stream writer {} ", this.streamName);
            try {
                createStreamClient();
            } catch (IOException e) {
                this.writer = null;
                LOG.error("Error during reopening client by name: {} for host: {}, port: {}. Reason: {} ", new Object[]{this.streamName, this.host, this.port, e.getMessage(), e});
                throw e;
            }
        }
    }

    private void createStreamClient() throws IOException {
        if (this.streamClient == null) {
            RestStreamClient.Builder builder = RestStreamClient.builder(this.host, this.port.intValue());
            builder.ssl(this.sslEnabled);
            builder.verifySSLCert(this.verifySSLCert);
            builder.writerPoolSize(this.writerPoolSize);
            builder.version(this.version);
            try {
                this.authClient = (AuthenticationClient) Class.forName(this.authClientClassName).newInstance();
                this.authClient.setConnectionInfo(this.host, this.port.intValue(), this.sslEnabled);
                if (this.authClient.isAuthEnabled()) {
                    Properties properties = new Properties();
                    properties.setProperty(BasicAuthenticationClient.VERIFY_SSL_CERT_PROP_NAME, String.valueOf(this.verifySSLCert));
                    if (this.authClientPropertiesPath == null || this.authClientPropertiesPath.isEmpty()) {
                        throw new Exception("Authentication client is enabled, but the path for properties file is either empty or null");
                    }
                    FileInputStream fileInputStream = new FileInputStream(this.authClientPropertiesPath);
                    try {
                        properties.load(fileInputStream);
                        Closeables.closeQuietly(fileInputStream);
                        this.authClient.configure(properties);
                    } catch (Throwable th) {
                        Closeables.closeQuietly(fileInputStream);
                        throw th;
                    }
                }
                builder.authClient(this.authClient);
                this.streamClient = builder.build();
            } catch (IOException e) {
                LOG.error("Cannot load properties", e);
                throw Throwables.propagate(e);
            } catch (ClassNotFoundException e2) {
                LOG.error("Can not resolve class {}: {}", new Object[]{this.authClientClassName, e2.getMessage(), e2});
                throw Throwables.propagate(e2);
            } catch (Exception e3) {
                LOG.error(e3.getMessage(), e3);
                throw Throwables.propagate(e3);
            }
        }
        try {
            if (this.writer == null) {
                this.writer = this.streamClient.createWriter(this.streamName);
            }
        } catch (Throwable th2) {
            closeWriterQuietly();
            throw new IOException("Can not create stream writer by name: " + this.streamName, th2);
        }
    }

    private void closeClientQuietly() {
        if (this.streamClient != null) {
            try {
                this.streamClient.close();
            } catch (Throwable th) {
                LOG.error("Error closing stream client. {}", th.getMessage(), th);
            }
            this.streamClient = null;
        }
        if (this.authClient != null) {
            this.authClient = null;
        }
    }

    private void closeWriterQuietly() {
        try {
            if (this.writer != null) {
                this.writer.close();
            }
        } catch (Throwable th) {
            LOG.error("Error closing writer. {}", th.getMessage(), th);
        }
        this.writer = null;
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public synchronized void start() {
        Preconditions.checkState(this.channel != null, "No channel configured");
        try {
            createStreamClient();
            LOG.info("StreamSink {} started.", getName());
            this.lifecycleState = LifecycleState.START;
        } catch (Throwable th) {
            LOG.error("Unable to create Stream client by name: {} for host: {}, port: {}. Reason: {} ", new Object[]{this.streamName, this.host, this.port, th.getMessage(), th});
            closeWriterQuietly();
            closeClientQuietly();
            this.lifecycleState = LifecycleState.ERROR;
        }
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public synchronized void stop() {
        LOG.info("StreamSink {} stopping...", getName());
        closeWriterQuietly();
        closeClientQuietly();
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public LifecycleState getLifecycleState() {
        return this.lifecycleState;
    }

    @Override // org.apache.flume.NamedComponent
    public void setName(String str) {
        this.name = str;
    }

    @Override // org.apache.flume.NamedComponent
    public String getName() {
        return this.name;
    }
}
