package com.mnt.sio.core;

import com.mnt.base.util.CommonUtil;
import com.mnt.sio.core.context.StreamContext;
import com.mnt.sio.core.dtd.StreamData;
import com.mnt.sio.core.pipe.PipeProcessor;
import com.mnt.sio.core.sin.SIn;
import com.mnt.sio.core.sout.SOut;
import com.mnt.sio.metrics.SIOMetrics;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/mnt/sio/core/PipeWorker.class */
public class PipeWorker implements Runnable {
    protected final Logger logger = LogManager.getLogger(getClass());
    private SIOMetrics metrics;
    private Pipe pipe;
    private PipeProcessor processor;
    private StreamContext context;
    private SIn sIn;
    private SOut sOut;
    private boolean syncFlag;

    public PipeWorker(SIOMetrics sIOMetrics, Pipe pipe) {
        this.syncFlag = true;
        this.metrics = sIOMetrics;
        this.pipe = pipe;
        this.processor = pipe.processor();
        this.context = pipe.context();
        this.sIn = pipe.sin();
        this.sOut = pipe.sout();
        this.syncFlag = this.sIn.sync();
        this.context.putPipe(pipe);
        this.sIn.beforeRun();
        for (int i = 0; i < Math.max(1, this.sIn.threadCount()); i++) {
            new Thread(this, "[RTPipe]:" + pipe.name() + ":" + i).start();
        }
        this.logger.info("started Pipe Worker for: {}", pipe.name());
    }

    @Override // java.lang.Runnable
    public void run() {
        PipeContext.pipeTL.set(this.pipe);
        this.sIn.initialize();
        while (true) {
            try {
                try {
                    process();
                } catch (Exception e) {
                    this.logger.warn("error while process pipe exec, ex: {}", e);
                    this.sIn.destory();
                    return;
                }
            } catch (Throwable th) {
                this.sIn.destory();
                throw th;
            }
        }
    }

    private void process() {
        List<StreamData> list = null;
        try {
            SIn sIn = this.sIn;
            PipeProcessor pipeProcessor = this.processor;
            pipeProcessor.getClass();
            list = sIn.poll(pipeProcessor::mapping);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            this.logger.warn("error while PipeWorker.process, poll data, ex: {}", e2);
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
        }
        if (!CommonUtil.isEmpty(list)) {
            list.forEach(this::process);
            this.sOut.flush(this.syncFlag);
        }
        if (this.syncFlag) {
            this.sIn.commit();
        }
    }

    private void process(StreamData streamData) {
        this.metrics.count(SIOMetrics.CountType.sin, this.sIn.name(), this.sIn.desc(), this.pipe.pipeIdf(), this.pipe.name());
        if (streamData != null) {
            this.metrics.touch();
            String key = streamData.key();
            long tid = streamData.tid();
            long timestamp = streamData.timestamp();
            if (CommonUtil.isEmpty(key)) {
                this.logger.warn("StreamData key is null:{}", streamData);
                return;
            }
            try {
                streamData = this.processor.process(() -> {
                    return this.context.latest(key, tid);
                }, () -> {
                    return this.context.context(key, tid);
                }, streamData, timestamp);
                if (!CommonUtil.isEmpty(streamData)) {
                    this.context.merge(key, this.context.latest(key, -1L), streamData);
                }
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    return;
                } else {
                    this.logger.warn("error while process data with flow, data: {}, pipe: {}, ex: {}", streamData, this.pipe, e);
                    this.metrics.count(SIOMetrics.CountType.err, "err", this.pipe.name() + ":err", this.pipe.pipeIdf(), this.pipe.name());
                    streamData = null;
                }
            }
            if (!CommonUtil.isEmpty(key)) {
                this.metrics.done(this.pipe.name(), key);
            }
            if (CommonUtil.isEmpty(streamData)) {
                return;
            }
            this.sOut.append(streamData);
            this.metrics.count(SIOMetrics.CountType.sout, this.sOut.name(), this.sOut.desc(), this.pipe.pipeIdf(), this.pipe.name());
        }
    }
}
