package org.apache.inlong.sort.standalone.channel;

import com.google.common.base.Preconditions;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.AbstractChannel;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.inlong.sort.standalone.utils.SizeSemaphore;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/channel/BufferQueueChannel.class */
public class BufferQueueChannel extends AbstractChannel {
    public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
    public static final String KEY_RELOADINTERVAL = "reloadInterval";
    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 131072;
    private static SizeSemaphore globalBufferQueueSizeKb;
    private BufferQueue<ProfileEvent> bufferQueue;
    protected Timer channelTimer;
    private ThreadLocal<ProfileTransaction> currentTransaction = new ThreadLocal<>();
    private AtomicLong takeCounter = new AtomicLong(0);
    private AtomicLong putCounter = new AtomicLong(0);

    public BufferQueueChannel() {
        SizeSemaphore globalBufferQueueSizeKb2 = getGlobalBufferQueueSizeKb(CommonPropertiesHolder.getContext());
        this.bufferQueue = new BufferQueue<>(globalBufferQueueSizeKb2.maxSize() / 3, globalBufferQueueSizeKb2);
    }

    public void put(Event event) throws ChannelException {
        this.putCounter.incrementAndGet();
        this.bufferQueue.acquire(event.getBody().length);
        ProfileTransaction profileTransaction = this.currentTransaction.get();
        Preconditions.checkState(profileTransaction != null, "No transaction exists for this thread");
        if (event instanceof ProfileEvent) {
            profileTransaction.doPut((ProfileEvent) event);
        } else {
            profileTransaction.doPut(new ProfileEvent(event.getBody(), event.getHeaders()));
        }
    }

    public Event take() throws ChannelException {
        ProfileEvent profileEvent = (ProfileEvent) this.bufferQueue.pollRecord();
        if (profileEvent != null) {
            ProfileTransaction profileTransaction = this.currentTransaction.get();
            Preconditions.checkState(profileTransaction != null, "No transaction exists for this thread");
            profileTransaction.doTake(profileEvent);
            this.takeCounter.incrementAndGet();
        }
        return profileEvent;
    }

    public Transaction getTransaction() {
        ProfileTransaction profileTransaction = new ProfileTransaction(this.bufferQueue);
        this.currentTransaction.set(profileTransaction);
        return profileTransaction;
    }

    public void start() {
        super.start();
        try {
            setReloadTimer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    protected void setReloadTimer() {
        this.channelTimer = new Timer(true);
        long longValue = CommonPropertiesHolder.getLong("reloadInterval", 60000L).longValue();
        this.channelTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sort.standalone.channel.BufferQueueChannel.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                BufferQueueChannel.LOG.info("queueSize:{},availablePermits:{},put:{},take:{}", new Object[]{Integer.valueOf(BufferQueueChannel.this.bufferQueue.size()), Integer.valueOf(BufferQueueChannel.this.bufferQueue.availablePermits()), Long.valueOf(BufferQueueChannel.this.putCounter.getAndSet(0L)), Long.valueOf(BufferQueueChannel.this.takeCounter.getAndSet(0L))});
            }
        }, new Date(System.currentTimeMillis() + longValue), longValue);
    }

    public void configure(Context context) {
    }

    public static SizeSemaphore getGlobalBufferQueueSizeKb(Context context) {
        if (globalBufferQueueSizeKb != null) {
            return globalBufferQueueSizeKb;
        }
        synchronized (LOG) {
            if (globalBufferQueueSizeKb != null) {
                return globalBufferQueueSizeKb;
            }
            globalBufferQueueSizeKb = new SizeSemaphore(context.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, Integer.valueOf(DEFAULT_MAX_BUFFERQUEUE_SIZE_KB)).intValue(), 1024);
            return globalBufferQueueSizeKb;
        }
    }
}
