package org.apache.inlong.sort.standalone.source.sortsdk;

import com.google.common.base.Preconditions;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.api.SortClientFactory;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.source.sortsdk.FetchCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.class */
public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
    private static final int CORE_POOL_SIZE = 1;
    private Map<String, SortClient> clients;
    private String sortClusterName;
    private long reloadInterval;
    private SortSdkSourceContext context;
    private ScheduledExecutorService pool;
    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
    private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;

    public synchronized void start() {
        reload();
    }

    public void stop() {
        this.pool.shutdownNow();
        this.clients.forEach((str, sortClient) -> {
            sortClient.close();
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        reload();
    }

    public void configure(Context context) {
        this.clients = new ConcurrentHashMap();
        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
        Preconditions.checkState(context != null, "No context, configure failed");
        this.context = new SortSdkSourceContext(getName(), context);
        this.reloadInterval = this.context.getReloadInterval();
        initReloadExecutor();
    }

    private void initReloadExecutor() {
        this.pool = Executors.newScheduledThreadPool(1);
        this.pool.scheduleAtFixedRate(this, this.reloadInterval, this.reloadInterval, TimeUnit.SECONDS);
    }

    private void reload() {
        SortClient newClient;
        List sortTasks = SortClusterConfigHolder.getClusterConfig().getSortTasks();
        LOG.info("start to reload SortSdkSource");
        Iterator it = sortTasks.iterator();
        while (it.hasNext()) {
            String name = ((SortTaskConfig) it.next()).getName();
            if (this.clients.get(name) == null && (newClient = newClient(name)) != null) {
                this.clients.put(name, newClient);
            }
        }
    }

    private SortClient newClient(String str) {
        LOG.info("Start to new sort client for id: {}", str);
        try {
            SortClientConfig sortClientConfig = new SortClientConfig(str, this.sortClusterName, new DefaultTopicChangeListener(), defaultStrategy, InetAddress.getLocalHost().getHostAddress());
            FetchCallback create = FetchCallback.Factory.create(str, getChannelProcessor(), this.context);
            sortClientConfig.setCallback(create);
            SortClient createSortClient = SortClientFactory.createSortClient(sortClientConfig);
            createSortClient.init();
            create.setClient(createSortClient);
            return createSortClient;
        } catch (UnknownHostException e) {
            LOG.error("Got one UnknownHostException when init client of id: " + str, e);
            return null;
        } catch (Throwable th) {
            LOG.error("Got one throwable when init client of id: " + str, th);
            return null;
        }
    }
}
