package org.apache.eagle.service.client.impl;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.SynchronousQueue;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.service.client.EagleServiceClientException;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eagle/service/client/impl/ConcurrentSender.class */
public class ConcurrentSender implements Closeable {
    private final int parallelNum;
    private final IEagleServiceClient client;
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentSender.class);
    private int batchSize = 1000;
    private long batchInterval = 3000;
    private boolean isStarted = false;
    private final SynchronousQueue<TaggedLogAPIEntity> queue = new SynchronousQueue<>();
    private final List<Handler> handlers = Collections.synchronizedList(new LinkedList());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/eagle/service/client/impl/ConcurrentSender$Handler.class */
    public class Handler extends BatchSender implements Runnable {
        private final long batchInterval;
        private final SynchronousQueue<TaggedLogAPIEntity> localQueue;
        private boolean isStopped;
        private long lastFlushTime;

        public Handler(SynchronousQueue<TaggedLogAPIEntity> synchronousQueue, IEagleServiceClient iEagleServiceClient, int i, long j) {
            super(iEagleServiceClient, i);
            this.localQueue = synchronousQueue;
            this.batchInterval = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ConcurrentSender.LOG.isDebugEnabled()) {
                ConcurrentSender.LOG.debug("Starting ...");
            }
            this.lastFlushTime = System.currentTimeMillis();
            while (!this.isStopped) {
                TaggedLogAPIEntity taggedLogAPIEntity = null;
                try {
                    taggedLogAPIEntity = this.localQueue.take();
                } catch (InterruptedException e) {
                    ConcurrentSender.LOG.error(e.getMessage(), e);
                }
                if (taggedLogAPIEntity != null) {
                    try {
                        send(taggedLogAPIEntity);
                    } catch (IOException e2) {
                        ConcurrentSender.LOG.error(e2.getMessage(), e2);
                    } catch (EagleServiceClientException e3) {
                        ConcurrentSender.LOG.error(e3.getMessage(), e3);
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis - this.lastFlushTime >= this.batchInterval) {
                        if (ConcurrentSender.LOG.isDebugEnabled()) {
                            ConcurrentSender.LOG.info(String.format("%s - %s >= %s", Long.valueOf(currentTimeMillis), Long.valueOf(this.lastFlushTime), Long.valueOf(this.batchInterval)));
                        }
                        try {
                            flush();
                        } catch (IOException e4) {
                            ConcurrentSender.LOG.error(e4.getMessage(), e4);
                        } catch (EagleServiceClientException e5) {
                            ConcurrentSender.LOG.error(e5.getMessage(), e5);
                        }
                    }
                } else {
                    ConcurrentSender.LOG.warn("Got null entity");
                }
            }
            if (ConcurrentSender.LOG.isDebugEnabled()) {
                ConcurrentSender.LOG.debug("Stopping ...");
            }
        }

        @Override // org.apache.eagle.service.client.impl.BatchSender, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.isStopped = true;
            super.close();
        }

        @Override // org.apache.eagle.service.client.impl.BatchSender
        public void flush() throws IOException, EagleServiceClientException {
            super.flush();
            this.lastFlushTime = System.currentTimeMillis();
        }
    }

    public ConcurrentSender(IEagleServiceClient iEagleServiceClient, int i) {
        this.parallelNum = i;
        this.client = iEagleServiceClient;
    }

    public void start() {
        if (this.isStarted) {
            LOG.warn("Already started");
            return;
        }
        LOG.info("Starting with handlers = " + this.parallelNum + ", batchSize = " + this.batchSize + ", batchInterval (ms) = " + this.batchInterval);
        for (int i = 0; i < this.parallelNum; i++) {
            Handler handler = new Handler(this.queue, this.client, this.batchSize, this.batchInterval);
            Thread thread = new Thread(handler);
            thread.setDaemon(true);
            thread.setName("Sender-" + i);
            this.handlers.add(handler);
            thread.start();
        }
        this.isStarted = true;
    }

    public ConcurrentSender batchSize(int i) {
        this.batchSize = i;
        return this;
    }

    public ConcurrentSender batchInterval(long j) {
        this.batchInterval = j;
        return this;
    }

    public ConcurrentSender send(List<? extends TaggedLogAPIEntity> list) throws InterruptedException {
        Iterator<? extends TaggedLogAPIEntity> it = list.iterator();
        while (it.hasNext()) {
            send(it.next());
        }
        return this;
    }

    public ConcurrentSender send(TaggedLogAPIEntity taggedLogAPIEntity) throws InterruptedException {
        if (!this.isStarted) {
            start();
        }
        this.queue.put(taggedLogAPIEntity);
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<Handler> it = this.handlers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
