package org.apache.inlong.sort.standalone.source.sortsdk;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.sdk.commons.admin.AdminServiceRegister;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.api.SortClientFactory;
import org.apache.inlong.sdk.sort.impl.ManagerReportHandlerImpl;
import org.apache.inlong.sdk.sort.impl.MetricReporterImpl;
import org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
import org.apache.inlong.sort.standalone.config.holder.SortSourceConfigType;
import org.apache.inlong.sort.standalone.config.loader.ClassResourceQueryConsumeConfig;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.class */
public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource, ConsumerServiceMBean {
    private static final int CORE_POOL_SIZE = 1;
    private String taskName;
    private SortSdkSourceContext context;
    private String sortClusterName;
    private long reloadInterval;
    private ScheduledExecutorService pool;
    private SortClient sortClient;
    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
    private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;

    public synchronized void start() {
        LOG.info("start to SortSdkSource:{}", this.taskName);
        this.sortClient = newClient(this.taskName);
    }

    public void stop() {
        this.pool.shutdownNow();
        LOG.info("Close sort client {}.", this.taskName);
        if (this.sortClient != null) {
            this.sortClient.getConfig().setStopConsume(true);
            this.sortClient.close();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("start to reload SortSdkSource:{}", this.taskName);
        if (this.sortClient != null) {
            this.sortClient.getConfig().setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
        }
    }

    public void configure(Context context) {
        this.taskName = context.getString(SinkContext.KEY_TASK_NAME);
        this.context = new SortSdkSourceContext(getName(), context);
        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
        this.reloadInterval = this.context.getReloadInterval();
        initReloadExecutor();
        AdminServiceRegister.register(ConsumerServiceMBean.MBEAN_TYPE, this.taskName, this);
    }

    private void initReloadExecutor() {
        this.pool = Executors.newScheduledThreadPool(1);
        this.pool.scheduleAtFixedRate(this, this.reloadInterval, this.reloadInterval, TimeUnit.SECONDS);
    }

    private SortClient newClient(String str) {
        SortClient createSortClient;
        LOG.info("Start to new sort client for task: {}", str);
        try {
            SortClientConfig sortClientConfig = new SortClientConfig(str, this.sortClusterName, new DefaultTopicChangeListener(), defaultStrategy, InetAddress.getLocalHost().getHostAddress());
            FetchCallback create = FetchCallback.Factory.create(str, getChannelProcessor(), this.context);
            sortClientConfig.setCallback(create);
            String string = CommonPropertiesHolder.getString("sortSourceConfig.QueryConsumeConfigType", SortSourceConfigType.MANAGER.name());
            if (SortClusterConfigType.FILE.name().equalsIgnoreCase(string)) {
                LOG.info("Create sort sdk client in file way:{}", string);
                createSortClient = SortClientFactory.createSortClient(sortClientConfig, new ClassResourceQueryConsumeConfig(), new MetricReporterImpl(sortClientConfig), new ManagerReportHandlerImpl());
            } else if (SortClusterConfigType.MANAGER.name().equalsIgnoreCase(string)) {
                LOG.info("Create sort sdk client in manager way:{}", string);
                sortClientConfig.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
                createSortClient = SortClientFactory.createSortClient(sortClientConfig);
            } else {
                LOG.info("Create sort sdk client in custom way:{}", string);
                Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                if (newInstance instanceof Configurable) {
                    ((Configurable) newInstance).configure(new Context(CommonPropertiesHolder.get()));
                }
                if (!(newInstance instanceof QueryConsumeConfig)) {
                    LOG.error("Got exception when create QueryConsumeConfig instance,config key:{},config class:{}", "sortSourceConfig.QueryConsumeConfigType", string);
                    return null;
                }
                createSortClient = SortClientFactory.createSortClient(sortClientConfig, (QueryConsumeConfig) newInstance, new MetricReporterImpl(sortClientConfig), new ManagerReportHandlerImpl());
            }
            createSortClient.init();
            create.setClient(createSortClient);
            return createSortClient;
        } catch (UnknownHostException e) {
            LOG.error("Got one UnknownHostException when init client of id:{}", str, e);
            return null;
        } catch (Throwable th) {
            LOG.error("Got one throwable when init client of id:{}", str, th);
            return null;
        }
    }

    @Override // org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean
    public void stopConsumer() {
        this.sortClient.getConfig().setStopConsume(true);
    }

    @Override // org.apache.inlong.sort.standalone.admin.ConsumerServiceMBean
    public void recoverConsumer() {
        this.sortClient.getConfig().setStopConsume(false);
    }
}
