package org.apache.inlong.sort.standalone.sink.elasticsearch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.class */
public class EsSink extends AbstractSink implements Configurable {
    public static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
    private Context parentContext;
    private BufferQueue<EsIndexRequest> dispatchQueue;
    private EsSinkContext context;
    private List<EsChannelWorker> workers = new ArrayList();
    private EsOutputChannel outputChannel;

    public void start() {
        super.start();
        try {
            this.dispatchQueue = SinkContext.createBufferQueue();
            this.context = new EsSinkContext(getName(), this.parentContext, getChannel(), this.dispatchQueue);
            this.context.start();
            for (int i = 0; i < this.context.getMaxThreads(); i++) {
                EsChannelWorker esChannelWorker = new EsChannelWorker(this.context, i);
                this.workers.add(esChannelWorker);
                esChannelWorker.start();
            }
            this.outputChannel = EsSinkFactory.createEsOutputChannel(this.context);
            this.outputChannel.init();
            this.outputChannel.start();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void stop() {
        super.stop();
        try {
            this.context.close();
            Iterator<EsChannelWorker> it = this.workers.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.workers.clear();
            this.outputChannel.close();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void configure(Context context) {
        LOG.info("start to configure:{}, context:{}.", getName(), context.toString());
        this.parentContext = context;
    }

    public Sink.Status process() throws EventDeliveryException {
        return Sink.Status.BACKOFF;
    }
}
