package com.alicloud.openservices.tablestore.writer.handle;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.writer.RequestManager;
import com.alicloud.openservices.tablestore.writer.RequestWithGroups;
import com.alicloud.openservices.tablestore.writer.RowChangeEvent;
import com.alicloud.openservices.tablestore.writer.RowChangeWithGroup;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.config.BucketConfig;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alicloud/openservices/tablestore/writer/handle/RowEventHandler.class */
public class RowEventHandler implements EventHandler<RowChangeEvent> {
    private Logger logger = LoggerFactory.getLogger(RowEventHandler.class);
    private AsyncClientInterface ots;
    private int concurrency;
    private int bucketConcurrency;
    private WriterConfig writerConfig;
    private BucketConfig bucketConfig;
    private TableStoreCallback<RowChange, RowWriteResult> callback;
    private Executor executor;
    private WriterHandleStatistics writerStatistics;
    private Semaphore callbackSemaphore;
    private Semaphore bucketSemaphore;
    private RequestManager requestManager;

    public RowEventHandler(AsyncClientInterface asyncClientInterface, BucketConfig bucketConfig, WriterConfig writerConfig, TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback, Executor executor, WriterHandleStatistics writerHandleStatistics, Semaphore semaphore) {
        this.ots = asyncClientInterface;
        this.concurrency = writerConfig.getConcurrency();
        this.bucketConfig = bucketConfig;
        this.callbackSemaphore = semaphore;
        this.callback = tableStoreCallback;
        this.executor = executor;
        this.writerStatistics = writerHandleStatistics;
        this.writerConfig = writerConfig;
        switch (writerConfig.getWriteMode()) {
            case SEQUENTIAL:
                this.bucketConcurrency = 1;
                break;
            case PARALLEL:
            default:
                this.bucketConcurrency = this.concurrency;
                break;
        }
        this.bucketSemaphore = new Semaphore(this.bucketConcurrency);
        initRequestManager();
    }

    private void initRequestManager() {
        switch (this.writerConfig.getBatchRequestType()) {
            case BULK_IMPORT:
                this.requestManager = new BulkImportRequestManager(this.ots, this.writerConfig, this.bucketConfig, this.executor, this.writerStatistics, this.callback, this.callbackSemaphore, this.bucketSemaphore);
                return;
            case BATCH_WRITE_ROW:
            default:
                this.requestManager = new BatchWriteRowRequestManager(this.ots, this.writerConfig, this.bucketConfig, this.executor, this.writerStatistics, this.callback, this.callbackSemaphore, this.bucketSemaphore);
                return;
        }
    }

    public void setCallback(TableStoreCallback<RowChange, RowWriteResult> tableStoreCallback) {
        this.callback = tableStoreCallback;
        initRequestManager();
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(RowChangeEvent rowChangeEvent, long j, boolean z) throws Exception {
        boolean z2 = false;
        CountDownLatch countDownLatch = null;
        RequestWithGroups requestWithGroups = null;
        if (rowChangeEvent.type == RowChangeEvent.EventType.FLUSH) {
            this.logger.debug("FlushSignal with QueueSize: {}", Integer.valueOf(this.requestManager.getTotalRowsCount()));
            if (this.requestManager.getTotalRowsCount() > 0) {
                requestWithGroups = this.requestManager.makeRequest();
            }
            z2 = true;
            countDownLatch = rowChangeEvent.latch;
        } else {
            this.writerStatistics.totalRowsCount.incrementAndGet();
            final RowChange rowChange = rowChangeEvent.rowChange;
            final RowChangeWithGroup rowChangeWithGroup = new RowChangeWithGroup(rowChangeEvent.rowChange, rowChangeEvent.group);
            if (!this.requestManager.appendRowChange(rowChangeWithGroup)) {
                requestWithGroups = this.requestManager.makeRequest();
                if (!this.requestManager.appendRowChange(rowChangeWithGroup)) {
                    this.executor.execute(new Runnable() { // from class: com.alicloud.openservices.tablestore.writer.handle.RowEventHandler.1
                        @Override // java.lang.Runnable
                        public void run() {
                            RowEventHandler.this.writerStatistics.totalFailedRowsCount.incrementAndGet();
                            ClientException clientException = new ClientException("Can not even append only one row into buffer.");
                            RowEventHandler.this.logger.error("RowChange Failed: ", clientException);
                            rowChangeWithGroup.group.failedOneRow(rowChangeWithGroup.rowChange, clientException);
                            if (RowEventHandler.this.callback != null) {
                                RowEventHandler.this.callback.onFailed(rowChange, clientException);
                            }
                        }
                    });
                }
            }
        }
        if (requestWithGroups != null) {
            final RequestWithGroups requestWithGroups2 = requestWithGroups;
            this.bucketSemaphore.acquire();
            this.callbackSemaphore.acquire();
            this.executor.execute(new Runnable() { // from class: com.alicloud.openservices.tablestore.writer.handle.RowEventHandler.2
                @Override // java.lang.Runnable
                public void run() {
                    RowEventHandler.this.writerStatistics.totalRequestCount.incrementAndGet();
                    RowEventHandler.this.requestManager.sendRequest(requestWithGroups2);
                }
            });
        }
        if (z2) {
            this.bucketSemaphore.acquire(this.bucketConcurrency);
            this.bucketSemaphore.release(this.bucketConcurrency);
            this.logger.debug("Finish bucket waitFlush.");
            countDownLatch.countDown();
        }
    }
}
