/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.extractor.realtime.assigner;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.extractor.realtime.assigner.DisruptorQueueExceptionHandler;

public class DisruptorQueue<E> {
    private Disruptor<Container<E>> disruptor;
    private RingBuffer<Container<E>> ringBuffer;

    private DisruptorQueue() {
    }

    public void publish(E obj) {
        this.ringBuffer.publishEvent((container, sequence, o) -> container.setObj(o), obj);
    }

    public void clear() {
        this.disruptor.halt();
    }

    private static class Container<E> {
        private E obj;

        private Container() {
        }

        public E getObj() {
            return this.obj;
        }

        public void setObj(E obj) {
            this.obj = obj;
        }
    }

    public static class Builder<E> {
        private int ringBufferSize = PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize();
        private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
        private ProducerType producerType = ProducerType.MULTI;
        private WaitStrategy waitStrategy = new BlockingWaitStrategy();
        private final List<EventHandler<E>> handlers = new ArrayList<EventHandler<E>>();

        public Builder<E> setProducerType(ProducerType producerType) {
            this.producerType = producerType;
            return this;
        }

        public Builder<E> addEventHandler(EventHandler<E> eventHandler) {
            this.handlers.add(eventHandler);
            return this;
        }

        public DisruptorQueue<E> build() {
            DisruptorQueue disruptorQueue = new DisruptorQueue();
            disruptorQueue.disruptor = new Disruptor(() -> new Container(), this.ringBufferSize, this.threadFactory, this.producerType, this.waitStrategy);
            for (EventHandler handler : this.handlers) {
                disruptorQueue.disruptor.handleEventsWith(new EventHandler[]{(container, sequence, endOfBatch) -> handler.onEvent(container.getObj(), sequence, endOfBatch)});
            }
            disruptorQueue.disruptor.setDefaultExceptionHandler((ExceptionHandler)new DisruptorQueueExceptionHandler());
            disruptorQueue.disruptor.start();
            disruptorQueue.ringBuffer = disruptorQueue.disruptor.getRingBuffer();
            return disruptorQueue;
        }
    }
}

