package com.codeloom.stream;

import com.codeloom.crypt.util.RSATools;
import com.codeloom.settings.Properties;
import com.codeloom.settings.PropertiesConstants;
import com.codeloom.stream.Flowable;
import com.codeloom.util.JsonTools;
import com.codeloom.util.Reportable;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/codeloom/stream/HandleWorker.class */
public class HandleWorker<D extends Flowable> implements Runnable, Reportable, AutoCloseable {
    protected static final Logger LOG = LoggerFactory.getLogger(HandleWorker.class);
    protected long interval;
    protected ConcurrentLinkedQueue<D> queue;
    protected int maxQueueLength;
    protected AbstractHandler<D> handler;
    private boolean abandonWhenFull;
    protected AtomicLong currentQueueLength = new AtomicLong(0);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    public HandleWorker(AbstractHandler<D> abstractHandler, Properties properties) {
        this.interval = 1000L;
        this.queue = null;
        this.maxQueueLength = RSATools.KEY_LENGTH;
        this.handler = null;
        this.abandonWhenFull = true;
        this.handler = abstractHandler;
        this.interval = PropertiesConstants.getLong(properties, "async.interval", this.interval, true);
        this.maxQueueLength = PropertiesConstants.getInt(properties, "async.maxQueueLength", this.maxQueueLength, true);
        this.abandonWhenFull = PropertiesConstants.getBoolean(properties, "async.abandonWhenFull", this.abandonWhenFull, true);
        this.queue = new ConcurrentLinkedQueue<>();
        this.scheduledExecutorService.scheduleWithFixedDelay(this, this.interval, this.interval, TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            flush(System.currentTimeMillis());
        } catch (Exception e) {
            LOG.error("Failed to flush queue.", e);
        }
    }

    @Override // com.codeloom.util.Reportable
    public void report(Map<String, Object> map) {
        if (map != null) {
            JsonTools.setLong(map, "interval", this.interval);
            JsonTools.setLong(map, "maxQueueLength", this.maxQueueLength);
            JsonTools.setLong(map, "currentQueueLength", this.currentQueueLength.get());
        }
    }

    public void handle(D d, long j) {
        if (this.currentQueueLength.get() > this.maxQueueLength) {
            LOG.warn("Current queue is full, length: {}, handler: {}", Long.valueOf(this.currentQueueLength.get()), this.handler.getClass().getName());
            handleWhenFull(d, j);
        } else {
            if (d instanceof AutoClear) {
                ((AutoClear) d).addRef();
            }
            this.queue.offer(d);
            this.currentQueueLength.incrementAndGet();
        }
    }

    private void handleWhenFull(D d, long j) {
        if (this.abandonWhenFull) {
            LOG.warn("Data is abandoned because the queue is full");
            return;
        }
        D poll = this.queue.poll();
        if (poll != null) {
            this.handler.onHandle(poll, j);
            if (poll instanceof AutoClear) {
                ((AutoClear) poll).release();
            }
            this.currentQueueLength.decrementAndGet();
        }
        if (d instanceof AutoClear) {
            ((AutoClear) d).addRef();
        }
        this.queue.offer(d);
        this.currentQueueLength.incrementAndGet();
    }

    public void flush(long j) {
        if (this.queue.isEmpty()) {
            return;
        }
        while (true) {
            D poll = this.queue.poll();
            if (poll == null) {
                this.handler.onFlush(j);
                return;
            }
            this.handler.onHandle(poll, j);
            if (poll instanceof AutoClear) {
                ((AutoClear) poll).release();
            }
            this.currentQueueLength.decrementAndGet();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            if (this.scheduledExecutorService.awaitTermination(0L, TimeUnit.MILLISECONDS)) {
                LOG.info("Executor is terminated in time");
            } else {
                LOG.info("Timeout to terminate executor.");
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted when waiting for termination.");
            Thread.currentThread().interrupt();
        } finally {
            this.scheduledExecutorService.shutdown();
        }
    }
}
