package org.apache.inlong.dataproxy.source.tcp;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelInitializer;
import java.lang.reflect.Constructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source.SimpleTcpSource;
import org.apache.inlong.dataproxy.source.SourceContext;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/tcp/InlongTcpSource.class */
public class InlongTcpSource extends SimpleTcpSource implements Configurable, EventDrivenSource, ProxyServiceMBean {
    public static final Logger LOG = LoggerFactory.getLogger(InlongTcpSource.class);
    protected SourceContext sourceContext;
    protected String msgFactoryName;
    protected String messageHandlerName;
    private Configurable pipelineFactoryConfigurable = null;

    @Override // org.apache.inlong.dataproxy.source.SimpleTcpSource, org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void startSource() {
        super.startSource();
    }

    @Override // org.apache.inlong.dataproxy.source.SimpleTcpSource, org.apache.inlong.dataproxy.source.BaseSource
    public synchronized void stop() {
        LOG.info("[STOP SOURCE]{} stopping...", super.toString());
        super.stop();
    }

    @Override // org.apache.inlong.dataproxy.source.SimpleTcpSource, org.apache.inlong.dataproxy.source.BaseSource
    public void configure(Context context) {
        try {
            LOG.info("context is {}", context);
            super.configure(context);
            this.sourceContext = new SourceContext(this, this.allChannels, context);
            this.sourceContext.start();
            this.msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME, InlongTcpChannelPipelineFactory.class.getName()).trim();
            Preconditions.checkArgument(StringUtils.isNotBlank(this.msgFactoryName), "msgFactoryName is empty");
            this.messageHandlerName = context.getString(ConfigConstants.MESSAGE_HANDLER_NAME, InlongTcpChannelHandler.class.getName());
            this.messageHandlerName = this.messageHandlerName.trim();
            Preconditions.checkArgument(StringUtils.isNotBlank(this.messageHandlerName), "messageHandlerName is empty");
            if (this.pipelineFactoryConfigurable != null) {
                this.pipelineFactoryConfigurable.configure(context);
            }
            AdminServiceRegister.register(ProxyServiceMBean.MBEAN_TYPE, getName(), this);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    @Override // org.apache.inlong.dataproxy.source.BaseSource
    public ChannelInitializer getChannelInitializerFactory() {
        LOG.info(new StringBuffer("load msgFactory=").append(this.msgFactoryName).append(" and serviceDecoderName=").append(this.serviceDecoderName).toString());
        try {
            Constructor<?> constructor = Class.forName(this.msgFactoryName).getConstructor(SourceContext.class, String.class);
            LOG.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
            Configurable configurable = (ChannelInitializer) constructor.newInstance(this.sourceContext, getProtocolName());
            if (configurable instanceof Configurable) {
                this.pipelineFactoryConfigurable = configurable;
                this.pipelineFactoryConfigurable.configure(this.sourceContext.getParentContext());
            }
            return configurable;
        } catch (Exception e) {
            LOG.error("Inlong Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}", this.msgFactoryName, e);
            stop();
            throw new FlumeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.inlong.dataproxy.source.SimpleTcpSource, org.apache.inlong.dataproxy.source.BaseSource
    public String getProtocolName() {
        return ConfigConstants.TCP_PROTOCOL;
    }

    @Override // org.apache.inlong.dataproxy.admin.ProxyServiceMBean
    public void stopService() {
        this.sourceContext.setRejectService(true);
    }

    @Override // org.apache.inlong.dataproxy.admin.ProxyServiceMBean
    public void recoverService() {
        this.sourceContext.setRejectService(false);
    }
}
