package cn.ymatrix.worker;

import cn.ymatrix.api.StatusCode;
import cn.ymatrix.apiclient.Result;
import cn.ymatrix.apiclient.ResultStatus;
import cn.ymatrix.apiserver.SendDataListener;
import cn.ymatrix.apiserver.SendDataResult;
import cn.ymatrix.builder.RequestType;
import cn.ymatrix.cache.Cache;
import cn.ymatrix.concurrencycontrol.WorkerPool;
import cn.ymatrix.data.Tuples;
import cn.ymatrix.data.TuplesConsumeResultConvertor;
import cn.ymatrix.exception.RetryException;
import cn.ymatrix.faulttolerance.CircuitBreakerFactory;
import cn.ymatrix.httpclient.DataSendingGRPCTask;
import cn.ymatrix.httpclient.HttpTask;
import cn.ymatrix.logger.MxLogger;
import cn.ymatrix.messagecenter.ResultMessageCenter;
import cn.ymatrix.messagecenter.ResultMessageQueue;
import cn.ymatrix.utils.StrUtil;
import cn.ymatrix.utils.ThroughoutCalculator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

/* loaded from: input_file:cn/ymatrix/worker/TuplesConsumer.class */
public class TuplesConsumer extends TaskWorker {
    private static final String TAG = StrUtil.logTagWrap(TuplesConsumer.class.getName());
    private WorkerPool tasksPool;
    private final AtomicBoolean dropAll = new AtomicBoolean(false);
    private RequestType requestType = RequestType.WithGRPC;
    private boolean useAsyncRequest;
    private final SendDataListener listener;

    /* loaded from: input_file:cn/ymatrix/worker/TuplesConsumer$SendDataListenerImpl.class */
    private static class SendDataListenerImpl implements SendDataListener {
        private static final Logger l = MxLogger.init(SendDataListenerImpl.class);

        @Override // cn.ymatrix.apiserver.SendDataListener
        public void onSuccess(SendDataResult sendDataResult, Tuples tuples) {
            if (tuples != null) {
                try {
                    ResultMessageQueue<Result> fetch = ResultMessageCenter.getSingleInstance().fetch(tuples.getSenderID());
                    if (fetch != null && sendDataResult != null) {
                        String connect = StrUtil.connect(sendDataResult.getMsg(), " with ", String.valueOf(tuples.size()), " lines.");
                        Result result = new Result();
                        result.setMsg(connect);
                        result.setSucceedLines(TuplesConsumeResultConvertor.convertSucceedTuplesLines(null, tuples));
                        result.setSuccessLinesSerialNumList(TuplesConsumeResultConvertor.convertSuccessfulSerialNums(sendDataResult.getErrorLinesMap(), tuples));
                        result.setStatus(ResultStatus.SUCCESS);
                        result.setRawTuples(tuples);
                        fetch.add(result);
                    }
                } catch (Exception e) {
                    l.error("{} Send data blocking onSuccess callback exception ", TuplesConsumer.TAG, e);
                }
            }
        }

        @Override // cn.ymatrix.apiserver.SendDataListener
        public void onFailure(SendDataResult sendDataResult, Tuples tuples) {
            if (tuples != null) {
                try {
                    ResultMessageQueue<Result> fetch = ResultMessageCenter.getSingleInstance().fetch(tuples.getSenderID());
                    if (fetch != null && sendDataResult != null) {
                        Result result = new Result();
                        result.setMsg(sendDataResult.getMsg());
                        if (sendDataResult.getCode() == StatusCode.ALL_TUPLES_FAIL) {
                            result.setErrorTuplesMap(TuplesConsumeResultConvertor.convertErrorTuples(null, tuples, sendDataResult.getMsg()));
                            result.setSucceedLines(0);
                        } else {
                            result.setErrorTuplesMap(TuplesConsumeResultConvertor.convertErrorTuples(sendDataResult.getErrorLinesMap(), tuples, sendDataResult.getMsg()));
                            result.setSucceedLines(TuplesConsumeResultConvertor.convertSucceedTuplesLines(sendDataResult.getErrorLinesMap(), tuples));
                            result.setSuccessLinesSerialNumList(TuplesConsumeResultConvertor.convertSuccessfulSerialNums(sendDataResult.getErrorLinesMap(), tuples));
                        }
                        result.setStatus(ResultStatus.FAILURE);
                        result.setRawTuples(tuples);
                        fetch.add(result);
                    }
                } catch (Exception e) {
                    l.error("{} Send data blocking onFailure callback exception ", TuplesConsumer.TAG, e);
                }
            }
        }
    }

    public TuplesConsumer() throws NullPointerException {
        this.stop.set(false);
        this.listener = new SendDataListenerImpl();
    }

    public RequestType getRequestType() {
        return this.requestType;
    }

    public void setRequestType(RequestType requestType) {
        if (requestType == null) {
            return;
        }
        this.requestType = requestType;
    }

    public void dropAll(boolean z) {
        this.dropAll.set(z);
    }

    public void useAsyncRequest(boolean z) {
        this.useAsyncRequest = z;
    }

    public void poolToDispatchTask(WorkerPool workerPool) {
        this.tasksPool = workerPool;
    }

    public void consume(final Cache cache) throws NullPointerException {
        if (cache == null) {
            throw new NullPointerException("register on a null cache");
        }
        this.workerPool.join(new Runnable() { // from class: cn.ymatrix.worker.TuplesConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (!TuplesConsumer.this.stop.get()) {
                    try {
                        TaskWorker.l.info("{} Before consume cache size {}", TuplesConsumer.TAG, Integer.valueOf(cache.size()));
                        Tuples tuples = cache.get();
                        if (TuplesConsumer.this.dropAll.get()) {
                            TaskWorker.l.info("{} Drop tuples in consumer mode.", TuplesConsumer.TAG);
                            ThroughoutCalculator.getInstance().add(tuples);
                        } else if (TuplesConsumer.this.tuplesValidation(tuples)) {
                            while (CircuitBreakerFactory.getInstance().isCircuitBreak(tuples.getTarget().getURL(), tuples.getSchema(), tuples.getTable())) {
                                try {
                                    TaskWorker.l.warn("{} Circuit breaker open, stop sending data to {}.{}", TuplesConsumer.TAG, tuples.getSchema(), tuples.getTable());
                                    Thread.sleep(100L);
                                } catch (Exception e) {
                                    TaskWorker.l.warn("{} failed to make thread sleep during circuit-breaking: {}", TuplesConsumer.TAG, e.getMessage());
                                }
                            }
                            TuplesConsumer.this.sendTuples(tuples);
                        }
                    } catch (Exception e2) {
                        TaskWorker.l.error("{} Consume tuples from cache exception: {}.", TuplesConsumer.TAG, e2);
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tuplesValidation(Tuples tuples) {
        try {
            tuplesNullableCheck(tuples);
            return true;
        } catch (NullPointerException e) {
            handleInvalidTuples(tuples);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleInvalidTuples(Tuples tuples) {
        if (tuples == null) {
            l.error("{} Invalid nullable tuples", TAG);
        } else {
            this.listener.onFailure(handleAllTuplesFailure(tuples.getSchema(), tuples.getTable()), tuples);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTuples(Tuples tuples) {
        try {
            if (getRequestType() == RequestType.WithHTTP) {
                sendTuplesHttp(tuples);
            } else {
                sendTuplesGRPC(tuples);
            }
        } catch (Exception e) {
            if (tuples != null) {
                l.error("{} send tuples in consumer with exception", TAG, e);
                handleInvalidTuples(tuples);
            }
        }
    }

    private SendDataResult handleAllTuplesFailure(String str, String str2) {
        return new SendDataResult(StatusCode.ALL_TUPLES_FAIL, null, "All tuples fail for table " + str + "." + str2);
    }

    private void tuplesNullableCheck(Tuples tuples) throws NullPointerException {
        if (tuples == null) {
            throw new NullPointerException("create DataSendingGRPCTask on a null tuples.");
        }
        if (tuples.getTarget() == null || tuples.getTarget().getURL() == null) {
            throw new NullPointerException("create DataSendingGRPCTask on a null tuples target.");
        }
        if (tuples.getSchema() == null) {
            throw new NullPointerException("create DataSendingGRPCTask on a null tuples schema.");
        }
        if (tuples.getTable() == null) {
            throw new NullPointerException("create DataSendingGRPCTask on a null tuples table.");
        }
    }

    private void sendTuplesHttp(Tuples tuples) {
        HttpTask generateHTTPTask = generateHTTPTask(tuples, this.retryConfig != null);
        generateHTTPTask.registerListener(this.listener);
        if (this.retryConfig == null) {
            this.tasksPool.join(generateHTTPTask);
            return;
        }
        generateHTTPTask.withRetry(this.retryConfig);
        Runnable decorateTaskWithRetry = decorateTaskWithRetry("SendTuplesHTTP", generateHTTPTask);
        if (decorateTaskWithRetry != null) {
            this.tasksPool.join(decorateTaskWithRetry);
        } else {
            l.error("{} Get an empty retry task to join for send tuples with HTTP {}.", TAG, tuples);
        }
    }

    private HttpTask generateHTTPTask(final Tuples tuples, final boolean z) {
        return new HttpTask(this.circuitBreakerConfig) { // from class: cn.ymatrix.worker.TuplesConsumer.2
            @Override // java.lang.Runnable
            public void run() throws RetryException {
                try {
                    if (TuplesConsumer.this.useAsyncRequest) {
                        requestAsync(tuples);
                    } else {
                        requestBlocking(tuples);
                    }
                } catch (RetryException e) {
                    if (z) {
                        throw e;
                    }
                    TaskWorker.l.error("{} Send tuples raw HTTP with Exception for table {}.{}", TuplesConsumer.TAG, tuples.getSchema(), tuples.getTable(), e);
                } catch (NullPointerException e2) {
                    TuplesConsumer.this.handleInvalidTuples(tuples);
                } catch (Exception e3) {
                    TaskWorker.l.error("{} Send tuples HTTP with unexpected Exception for table {}.{}", TuplesConsumer.TAG, tuples.getSchema(), tuples.getTable(), e3);
                }
            }
        };
    }

    private void sendTuplesGRPC(Tuples tuples) throws NullPointerException {
        DataSendingGRPCTask generateTaskGRPC = generateTaskGRPC(tuples, this.retryConfig != null);
        generateTaskGRPC.registerListener(this.listener);
        if (this.retryConfig == null) {
            this.tasksPool.join(generateTaskGRPC);
            return;
        }
        generateTaskGRPC.withRetry(this.retryConfig);
        Runnable decorateTaskWithRetry = decorateTaskWithRetry("SendTuplesGRPC", generateTaskGRPC);
        if (decorateTaskWithRetry == null) {
            l.error("{} Get an empty retry task to join for send tuples with gRPC {}.", TAG, tuples);
        }
        this.tasksPool.join(decorateTaskWithRetry);
    }

    private DataSendingGRPCTask generateTaskGRPC(final Tuples tuples, final boolean z) throws NullPointerException {
        return new DataSendingGRPCTask(tuples.getTarget().getURL(), tuples.getSchema(), tuples.getTable(), tuples.getTarget().getTimeout(), this.circuitBreakerConfig) { // from class: cn.ymatrix.worker.TuplesConsumer.3
            @Override // java.lang.Runnable
            public void run() throws RetryException {
                try {
                    if (TuplesConsumer.this.useAsyncRequest) {
                        sendTuplesAsync(tuples);
                    } else {
                        sendTuplesBlocking(tuples);
                    }
                } catch (RetryException e) {
                    if (z) {
                        throw e;
                    }
                    TaskWorker.l.error("{} Send tuples raw gRCP with Exception for table {}.{}", TuplesConsumer.TAG, tuples.getSchema(), tuples.getTable(), e);
                } catch (NullPointerException e2) {
                    TuplesConsumer.this.handleInvalidTuples(tuples);
                } catch (Exception e3) {
                    TaskWorker.l.error("{} Send tuples gRPC raw with unexpected Exception for table {}.{}", TuplesConsumer.TAG, tuples.getSchema(), tuples.getTable(), e3);
                }
            }
        };
    }
}
