package net.neoremind.fountain.producer;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.neoremind.fountain.changedata.ChangeDataSet;
import net.neoremind.fountain.event.BaseLogEvent;
import net.neoremind.fountain.meta.CachedTableMetaProvider;
import net.neoremind.fountain.producer.able.Resumable;
import net.neoremind.fountain.producer.able.Suspendable;
import net.neoremind.fountain.producer.datasource.BinlogDataSource;
import net.neoremind.fountain.producer.dispatch.BinlogRowOutputUnitManager;
import net.neoremind.fountain.producer.dispatch.DispatchUnitManager;
import net.neoremind.fountain.producer.dispatch.DispatchWorkflow;
import net.neoremind.fountain.producer.dispatch.transcontrol.TransactionPolicy;
import net.neoremind.fountain.producer.exception.NormalSocketTimeoutException;
import net.neoremind.fountain.producer.exception.ProducerInitException;
import net.neoremind.fountain.producer.matcher.AbstractTableMatcher;
import net.neoremind.fountain.producer.matcher.EventMatcher;
import net.neoremind.fountain.producer.parser.Parser;
import net.neoremind.fountain.support.ThreadHolder;
import net.neoremind.fountain.support.TrxContext;
import net.neoremind.fountain.thread.annotaion.UnThreadSafe;
import net.neoremind.fountain.util.UnsignedNumberHelper;
import net.neoremind.haguard.HaGuard;
import net.neoremind.haguard.NoneHaGuard;
import org.slf4j.Logger;
import org.springframework.beans.factory.BeanNameAware;

/* loaded from: input_file:net/neoremind/fountain/producer/AbstractProducer.class */
public abstract class AbstractProducer implements SingleProducer, BeanNameAware, Resumable, Suspendable {
    private Parser parser;
    private BinlogDataSource dataSource;
    private DispatchWorkflow dispatcher;
    private EventMatcher matcher;
    private String dbCharset;
    private TransactionPolicy transactionPolicy;
    private String sliceName;
    private String beanName;
    private volatile CountDownLatch destroyWait;
    private static final ThreadLocal<ProfilingInfo> PROFLING_INFO = new ThreadLocal<ProfilingInfo>() { // from class: net.neoremind.fountain.producer.AbstractProducer.3
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public ProfilingInfo initialValue() {
            return new ProfilingInfo();
        }
    };
    protected DispatchUnitManager dispatchUnitManager = new BinlogRowOutputUnitManager();
    private HaGuard haGuard = new NoneHaGuard();
    private int repeatResourceInterval = 5000;
    private int threadStartTimeout = 5000;
    private boolean preparedResource = false;
    private final CountDownLatch threadStartWait = new CountDownLatch(1);
    private volatile boolean shutDowning = false;
    private boolean enableProfilingPrintInfo = false;
    private volatile boolean forceSuspend = false;
    private ConsequentSocketTimeoutHandler consequentSocketTimeoutHandler = new ConsequentSocketTimeoutHandler() { // from class: net.neoremind.fountain.producer.AbstractProducer.1
        @Override // net.neoremind.fountain.producer.ConsequentSocketTimeoutHandler
        public void clean() {
        }

        @Override // net.neoremind.fountain.producer.ConsequentSocketTimeoutHandler
        public boolean handleTimeout() {
            return true;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    @UnThreadSafe
    /* loaded from: input_file:net/neoremind/fountain/producer/AbstractProducer$ProfilingInfo.class */
    public static class ProfilingInfo {
        long startTime;
        long lastTime;
        float intervalTimeInMilli;
        long totalIncrmentCount;
        long incrmentCount;

        private ProfilingInfo() {
            this.intervalTimeInMilli = 60000.0f;
        }

        void doOne(Logger logger) {
            this.totalIncrmentCount++;
            this.incrmentCount++;
            if (this.startTime == 0) {
                this.startTime = System.currentTimeMillis();
                this.lastTime = this.startTime;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (((float) (currentTimeMillis - this.lastTime)) > this.intervalTimeInMilli) {
                logger.info("upTime=" + (currentTimeMillis - this.startTime) + "ms,totalIncrementCount=" + this.totalIncrmentCount + ",QPM=" + String.format("%.2f", Float.valueOf(((float) this.totalIncrmentCount) / (((float) (currentTimeMillis - this.startTime)) / this.intervalTimeInMilli))) + ". cycleTime=" + (currentTimeMillis - this.lastTime) + "ms,incrementCount=" + this.incrmentCount + ",QPM=" + String.format("%.2f", Float.valueOf(((float) this.incrmentCount) / (((float) (currentTimeMillis - this.lastTime)) / this.intervalTimeInMilli))));
                this.lastTime = currentTimeMillis;
                this.incrmentCount = 0L;
            }
        }
    }

    @Override // net.neoremind.fountain.producer.SingleProducer
    public void start() {
        init();
        Thread thread = new Thread(new Runnable() { // from class: net.neoremind.fountain.producer.AbstractProducer.2
            @Override // java.lang.Runnable
            public void run() {
                if (AbstractProducer.this.dbCharset != null) {
                    UnsignedNumberHelper.configCharset(AbstractProducer.this.dbCharset);
                }
                AbstractProducer.this.registerWorkflow();
                AbstractProducer.this.threadStartWait.countDown();
                AbstractProducer.this.threadHandler();
            }
        });
        thread.setName("fountain-" + getInstanceName() + "-" + thread.getId());
        thread.start();
        try {
            this.threadStartWait.await(this.threadStartTimeout, TimeUnit.MILLISECONDS);
            getLogger().info("Succeed to start producer of " + getInstanceName());
        } catch (InterruptedException e) {
            e.printStackTrace();
            getLogger().error(getInstanceName(), e);
            throw new ProducerInitException();
        }
    }

    private void init() {
        this.dataSource.bindUniqName(getInstanceName());
        if (this.transactionPolicy != null) {
            this.dispatchUnitManager.setTransactionPolicy(this.transactionPolicy);
        }
        this.parser.setTableMetaProvider(new CachedTableMetaProvider(this.dataSource));
        this.haGuard.init(getInstanceName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerWorkflow() {
        this.dispatcher.registerProducer(getInstanceName());
    }

    private void prepareResource() {
        try {
            this.dataSource.openReplication();
            this.dispatchUnitManager.cleanCachedEventData();
            this.preparedResource = true;
        } catch (IOException e) {
            e.printStackTrace();
            getLogger().error(getInstanceName(), e);
            throw new ProducerInitException();
        } catch (NoSuchAlgorithmException e2) {
            e2.printStackTrace();
            getLogger().error(getInstanceName(), e2);
            throw new ProducerInitException();
        } catch (TimeoutException e3) {
            e3.printStackTrace();
            getLogger().error(getInstanceName(), e3);
            throw new ProducerInitException();
        }
    }

    private boolean isHaLeader() {
        if (this.haGuard.hasToken() || this.haGuard.takeTokenWithDefaultTimeout()) {
            return true;
        }
        getLogger().warn("Get HA token timeout, current instance will standby and wait for the next try");
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void threadHandler() {
        byte[] bArr = null;
        ThreadHolder.setTrxContext(TrxContext.factory());
        while (true) {
            if (this.forceSuspend) {
                blockThreadHandler();
            }
            if (this.shutDowning) {
                doDestory();
                return;
            }
            if (isHaLeader()) {
                ensureResource();
                try {
                    bArr = this.dataSource.readEventData();
                    this.consequentSocketTimeoutHandler.clean();
                    String str = this.dataSource.getIpAddress() + ":" + this.dataSource.getPort();
                    try {
                        BaseLogEvent[] baseLogEventArr = {null};
                        if (procEventData(bArr, baseLogEventArr)) {
                            BaseLogEvent baseLogEvent = baseLogEventArr[0];
                            if (this.shutDowning) {
                                doDestory();
                                return;
                            }
                            try {
                                ThreadHolder.getTrxContext().handleEvent(baseLogEvent);
                                ChangeDataSet accept = this.dispatchUnitManager.accept(baseLogEvent, getInstanceName());
                                if (accept != null) {
                                    accept.setBirthTime(baseLogEvent.getEventHeader().getTimestamp());
                                    accept.setSendTime(System.currentTimeMillis());
                                    accept.setSrcDbHost(str);
                                    fillSyncPoint(accept, baseLogEvent);
                                    if (this.haGuard.hasToken()) {
                                        handleDispatch(accept);
                                        if (this.enableProfilingPrintInfo) {
                                            PROFLING_INFO.get().doOne(getLogger());
                                        }
                                    } else {
                                        getLogger().info("Lost ha token, don't dispatch message.");
                                        realeaseResource();
                                    }
                                }
                                endTrans(baseLogEvent);
                            } catch (Throwable th) {
                                getLogger().error(getInstanceName(), th);
                                realeaseResource();
                            }
                        }
                    } catch (RuntimeException e) {
                        handleException(bArr, e);
                    } catch (Throwable th2) {
                        handleException(bArr, th2);
                    }
                } catch (RuntimeException e2) {
                    handleException(bArr, e2);
                } catch (NormalSocketTimeoutException e3) {
                    if (this.consequentSocketTimeoutHandler.handleTimeout()) {
                        getLogger().info("Datasource has no change data for a long time, it should switch to another datasource:{}", combineDatasoruceInfo());
                        realeaseResource();
                    } else {
                        getLogger().info("Datasource has no change data, normal time out:{}", combineDatasoruceInfo());
                    }
                } catch (Throwable th3) {
                    handleException(bArr, th3);
                }
            }
        }
    }

    protected abstract boolean procEventData(byte[] bArr, BaseLogEvent[] baseLogEventArr);

    protected abstract Logger getLogger();

    protected abstract void endTrans(BaseLogEvent baseLogEvent);

    protected abstract void fillSyncPoint(ChangeDataSet changeDataSet, BaseLogEvent baseLogEvent);

    public DispatchWorkflow getDispatcher() {
        return this.dispatcher;
    }

    public void setDispatcher(DispatchWorkflow dispatchWorkflow) {
        this.dispatcher = dispatchWorkflow;
    }

    private void ensureResource() {
        while (!this.preparedResource) {
            try {
                getLogger().info("Prepare replication socket...");
                prepareResource();
            } catch (Exception e) {
            }
            if (this.shutDowning) {
                return;
            } else {
                try {
                    Thread.sleep(this.repeatResourceInterval);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    private void handleDispatch(ChangeDataSet changeDataSet) {
        this.dispatcher.dispatchEvent(changeDataSet);
    }

    protected void handleException(byte[] bArr, Throwable th) {
        getLogger().error(th.getMessage(), th);
        getLogger().error(combineDatasoruceInfo());
        if (bArr != null) {
            getLogger().error("Print byte data ... ");
            StringBuffer stringBuffer = new StringBuffer();
            for (byte b : bArr) {
                stringBuffer.append((int) b).append(AbstractTableMatcher.FILTER_SEPARATOR);
            }
            getLogger().error(stringBuffer.toString());
        }
        realeaseResource();
    }

    private String combineDatasoruceInfo() {
        return new StringBuffer("datasource is [ip, port] [").append(this.dataSource.getIpAddress()).append(", ").append(this.dataSource.getPort()).append("]").toString();
    }

    @Override // net.neoremind.fountain.producer.SingleProducer
    public void destroy() {
        this.destroyWait = new CountDownLatch(1);
        this.shutDowning = true;
        try {
            this.destroyWait.await(90L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void doDestory() {
        realeaseResource();
        if (this.destroyWait != null) {
            this.destroyWait.countDown();
        }
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    protected String getInstanceName() {
        return null != this.sliceName ? this.sliceName : this.beanName;
    }

    private void realeaseResource() {
        try {
            if (this.dataSource != null) {
                this.dataSource.close();
            }
        } catch (Exception e) {
            getLogger().error(e.getMessage(), e);
        }
        this.preparedResource = false;
        this.consequentSocketTimeoutHandler.clean();
    }

    public boolean isEnableProfilingPrintInfo() {
        return this.enableProfilingPrintInfo;
    }

    public void setEnableProfilingPrintInfo(boolean z) {
        this.enableProfilingPrintInfo = z;
    }

    public ConsequentSocketTimeoutHandler getConsequentSocketTimeoutHandler() {
        return this.consequentSocketTimeoutHandler;
    }

    public void setConsequentSocketTimeoutHandler(ConsequentSocketTimeoutHandler consequentSocketTimeoutHandler) {
        this.consequentSocketTimeoutHandler = consequentSocketTimeoutHandler;
    }

    public String getSliceName() {
        return this.sliceName;
    }

    public void setSliceName(String str) {
        this.sliceName = str;
    }

    public Parser getParser() {
        return this.parser;
    }

    public void setParser(Parser parser) {
        this.parser = parser;
    }

    public BinlogDataSource getDataSource() {
        return this.dataSource;
    }

    public void setDataSource(BinlogDataSource binlogDataSource) {
        this.dataSource = binlogDataSource;
    }

    public int getThreadStartTimeout() {
        return this.threadStartTimeout;
    }

    public void setThreadStartTimeout(int i) {
        this.threadStartTimeout = i;
    }

    public CountDownLatch getThreadStartWait() {
        return this.threadStartWait;
    }

    public int getRepeatResourceInterval() {
        return this.repeatResourceInterval;
    }

    public void setRepeatResourceInterval(int i) {
        this.repeatResourceInterval = i;
    }

    public DispatchUnitManager getDispatchUnitManager() {
        return this.dispatchUnitManager;
    }

    public void setDispatchUnitManager(DispatchUnitManager dispatchUnitManager) {
        this.dispatchUnitManager = dispatchUnitManager;
    }

    public EventMatcher getMatcher() {
        return this.matcher;
    }

    public void setMatcher(EventMatcher eventMatcher) {
        this.matcher = eventMatcher;
    }

    public String getDbCharset() {
        return this.dbCharset;
    }

    public void setDbCharset(String str) {
        this.dbCharset = str;
    }

    public TransactionPolicy getTransactionPolicy() {
        return this.transactionPolicy;
    }

    public void setTransactionPolicy(TransactionPolicy transactionPolicy) {
        this.transactionPolicy = transactionPolicy;
    }

    public HaGuard getHaGuard() {
        return this.haGuard;
    }

    public void setHaGuard(HaGuard haGuard) {
        this.haGuard = haGuard;
    }

    public boolean isPreparedResource() {
        return this.preparedResource;
    }

    @Override // net.neoremind.fountain.producer.able.Suspendable
    public void suspend() {
        this.forceSuspend = true;
    }

    @Override // net.neoremind.fountain.producer.able.Resumable
    public synchronized void resume() {
        this.forceSuspend = false;
        notifyAll();
    }

    protected synchronized void blockThreadHandler() {
        try {
            getLogger().warn("Blocking MySQL sync thread since `forceSuspend` is set to true");
            wait();
        } catch (InterruptedException e) {
            getLogger().error(e.getMessage(), e);
        }
    }
}
