package org.apache.iotdb.db.sync.externalpipe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.sync.datasource.PipeOpManager;
import org.apache.iotdb.db.sync.datasource.PipeStorageGroupInfo;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginConfiguration;
import org.apache.iotdb.db.sync.externalpipe.operation.DeleteOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.pipe.external.api.ExternalPipeSinkWriterStatus;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriter;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.class */
public class ExtPipePlugin {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ExtPipePlugin.class);
    private String extPipeTypeName;
    Map<String, String> sinkParams;
    private PipeOpManager pipeOpManager;
    private ExtPipePluginManager extPipePluginManager;
    private IExternalPipeSinkWriterFactory pipeSinkWriterFactory;
    private ExtPipePluginConfiguration configuration;
    private List<DataTransmissionTask> dataTransmissionTasks;
    private ExecutorService executorService;
    private Map<String, Map<String, AtomicInteger>> writerInvocationFailures;
    private int timestampDivisor;
    private volatile boolean alive = false;
    private Map<String, Long> dataCommitMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin$DataTransmissionTask.class */
    public class DataTransmissionTask implements Callable<Void> {
        private final IExternalPipeSinkWriter writer;
        private final int threadIndex;
        private final ExtPipePluginConfiguration configuration;
        private Map<String, PipeStorageGroupInfo> sgInfoMap;
        private long nextIndex;
        private String lastReadSgName;
        private byte[] newDataLocker = new byte[0];
        private long newDataCounter = 0;

        DataTransmissionTask(IExternalPipeSinkWriter iExternalPipeSinkWriter, int i, ExtPipePluginConfiguration extPipePluginConfiguration) throws IOException {
            this.writer = (IExternalPipeSinkWriter) Validate.notNull(iExternalPipeSinkWriter);
            this.threadIndex = i;
            this.configuration = extPipePluginConfiguration;
            this.sgInfoMap = extPipePluginConfiguration.getBucketSgInfoMap(i);
            this.writer.open();
        }

        private long getSgNextDataIndex(String str) {
            PipeStorageGroupInfo pipeStorageGroupInfo = this.sgInfoMap.get(str);
            if (pipeStorageGroupInfo != null) {
                return pipeStorageGroupInfo.getNextReadIndex();
            }
            long committedIndex = ExtPipePlugin.this.pipeOpManager.getCommittedIndex(str) + 1;
            this.sgInfoMap.put(str, new PipeStorageGroupInfo(str, committedIndex - 1, committedIndex));
            return committedIndex;
        }

        private void setSgNextDataIndex(String str, long j, long j2) {
            ExtPipePlugin.logger.debug("setSgNextDataIndex(), sgName={}, nextReadIndex={}, committedIndex={}.", str, Long.valueOf(j), Long.valueOf(j2));
            PipeStorageGroupInfo computeIfAbsent = this.sgInfoMap.computeIfAbsent(str, str2 -> {
                return new PipeStorageGroupInfo(str, -1L, 0L);
            });
            computeIfAbsent.setNextReadIndex(j);
            computeIfAbsent.setCommittedIndex(j2);
        }

        private void commitData(String str, long j) throws IOException {
            ExtPipePlugin.logger.debug("commitData(), sgName={}, committedIndex={}.", str, Long.valueOf(j));
            ExtPipePlugin.this.dataCommitMap.put(str, Long.valueOf(j));
            if (ExtPipePlugin.this.pipeOpManager.opBlockNeedCommit(str, j)) {
                ExtPipePlugin.this.extPipePluginManager.triggerCommit(str, j);
            }
        }

        private boolean sgHasNewData(String str) {
            PipeStorageGroupInfo pipeStorageGroupInfo = this.sgInfoMap.get(str);
            if (pipeStorageGroupInfo == null) {
                return true;
            }
            return ExtPipePlugin.this.pipeOpManager.getNextIndex(str) > pipeStorageGroupInfo.getNextReadIndex();
        }

        public void notifyNewDataArrive(String str, long j, long j2) {
            synchronized (this.newDataLocker) {
                this.newDataCounter++;
                this.newDataLocker.notifyAll();
            }
        }

        public String waitForOperations() throws InterruptedException {
            if (this.lastReadSgName != null) {
                if (sgHasNewData(this.lastReadSgName)) {
                    return this.lastReadSgName;
                }
                this.lastReadSgName = null;
            }
            while (ExtPipePlugin.this.alive) {
                for (String str : ExtPipePlugin.this.pipeOpManager.getSgSet()) {
                    if (this.threadIndex == ExtPipePlugin.this.getThreadIndex(str) && sgHasNewData(str)) {
                        this.lastReadSgName = str;
                        return str;
                    }
                }
                synchronized (this.newDataLocker) {
                    if (this.newDataCounter <= 0) {
                        try {
                            this.newDataLocker.wait(15000L);
                        } catch (InterruptedException e) {
                        }
                    }
                    this.newDataCounter = 0L;
                }
            }
            return null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            ExtPipePlugin.logger.info("ExternalPipeWorker start. thread={}.", Thread.currentThread().getName());
            while (ExtPipePlugin.this.alive) {
                try {
                    String waitForOperations = waitForOperations();
                    if (waitForOperations != null) {
                        try {
                            Operation operation = ExtPipePlugin.this.pipeOpManager.getOperation(waitForOperations, getSgNextDataIndex(waitForOperations), this.configuration.getOperationBatchSize());
                            if (operation != null && operation.getDataCount() > 0) {
                                if (!handleOperationWithRetry(waitForOperations, operation)) {
                                    ExtPipePlugin.logger.error("Failed to handle operation after " + this.configuration.getAttemptTimes() + " attempts: " + operation);
                                }
                                if (!flushWithRetry()) {
                                    ExtPipePlugin.logger.error("Failed to flush operations after " + this.configuration.getAttemptTimes() + " attempts: startIndex=" + operation.getStartIndex() + ",endIndex=" + operation.getEndIndex());
                                }
                                this.nextIndex = operation.getEndIndex();
                                setSgNextDataIndex(waitForOperations, this.nextIndex, this.nextIndex - 1);
                                commitData(waitForOperations, this.nextIndex - 1);
                            }
                        } catch (IOException e) {
                        }
                    }
                } catch (InterruptedException e2) {
                } catch (Exception e3) {
                    ExtPipePlugin.logger.error("Unexpected system exception", (Throwable) e3);
                }
            }
            try {
                this.writer.close();
            } catch (IOException e4) {
                ExtPipePlugin.this.handleExceptionsThrownByWriter("close", e4);
                ExtPipePlugin.logger.info("Exception happened when closing the writer", (Throwable) e4);
            }
            ExtPipePlugin.logger.info("ExternalPipeWorker exits. Thread={}", Thread.currentThread().getName());
            return null;
        }

        public ExternalPipeSinkWriterStatus getStatus() {
            return this.writer.getStatus();
        }

        private boolean handleOperationWithRetry(String str, Operation operation) {
            boolean z = false;
            int attemptTimes = this.configuration.getAttemptTimes();
            while (ExtPipePlugin.this.alive && attemptTimes > 0) {
                try {
                    pushOperationToExtPipe(str, operation);
                    z = true;
                    break;
                } catch (Exception e) {
                    ExtPipePlugin.logger.error("When handle operation {}, Exception", operation.getOperationType(), e);
                    ExtPipePlugin.this.handleExceptionsThrownByWriter(operation.getOperationTypeName(), e);
                    attemptTimes--;
                    try {
                        Thread.sleep(this.configuration.getBackOffInterval());
                    } catch (InterruptedException e2) {
                    }
                }
            }
            return z;
        }

        private void pushOperationToExtPipe(String str, Operation operation) throws IOException, IllegalArgumentException {
            if (operation instanceof InsertOperation) {
                handleInsertOperation(str, (InsertOperation) operation);
            } else if (operation instanceof DeleteOperation) {
                handleDeleteOperation(str, (DeleteOperation) operation);
            } else {
                ExtPipePlugin.logger.error("pushOperationToExtPipe(), Unrecognized Operation: {}", operation);
                throw new IllegalArgumentException("pushOperationToExtPipe(), Unrecognized Operation:" + operation);
            }
        }

        private void handleInsertOperation(String str, InsertOperation insertOperation) throws IOException, IllegalArgumentException {
            for (Pair<MeasurementPath, List<TimeValuePair>> pair : insertOperation.getDataList()) {
                MeasurementPath measurementPath = pair.left;
                for (TimeValuePair timeValuePair : pair.right) {
                    if (timeValuePair != null) {
                        String[] nodes = measurementPath.getNodes();
                        long timestamp = timeValuePair.getTimestamp() / ExtPipePlugin.this.timestampDivisor;
                        switch (timeValuePair.getValue().getDataType()) {
                            case BOOLEAN:
                                this.writer.insertBoolean(str, nodes, timestamp, timeValuePair.getValue().getBoolean());
                                break;
                            case INT32:
                                this.writer.insertInt32(str, nodes, timestamp, timeValuePair.getValue().getInt());
                                break;
                            case INT64:
                                this.writer.insertInt64(str, nodes, timestamp, timeValuePair.getValue().getLong());
                                break;
                            case FLOAT:
                                this.writer.insertFloat(str, nodes, timestamp, timeValuePair.getValue().getFloat());
                                break;
                            case DOUBLE:
                                this.writer.insertDouble(str, nodes, timestamp, timeValuePair.getValue().getDouble());
                                break;
                            case TEXT:
                                this.writer.insertText(str, nodes, timestamp, timeValuePair.getValue().getStringValue());
                                break;
                            default:
                                throw new IllegalArgumentException("Unrecognized data type " + timeValuePair.getValue().getDataType());
                        }
                    }
                }
            }
        }

        private void handleDeleteOperation(String str, DeleteOperation deleteOperation) throws IOException {
            this.writer.delete(str, deleteOperation.getDeletePathStr(), deleteOperation.getStartTime() / ExtPipePlugin.this.timestampDivisor, deleteOperation.getEndTime() / ExtPipePlugin.this.timestampDivisor);
        }

        private boolean flushWithRetry() {
            try {
                this.writer.flush();
                return true;
            } catch (IOException e) {
                ExtPipePlugin.logger.error("Exception happened when flushing operations", (Throwable) e);
                ExtPipePlugin.this.handleExceptionsThrownByWriter("flush", e);
                boolean z = false;
                for (int i = 1; ExtPipePlugin.this.alive && i < this.configuration.getAttemptTimes(); i++) {
                    try {
                        Thread.sleep(this.configuration.getBackOffInterval());
                    } catch (InterruptedException e2) {
                    }
                    try {
                        this.writer.flush();
                        z = true;
                        break;
                    } catch (Exception e3) {
                        ExtPipePlugin.this.handleExceptionsThrownByWriter("flush", e3);
                    }
                }
                return z;
            }
        }
    }

    public ExtPipePlugin(String str, Map<String, String> map, ExtPipePluginManager extPipePluginManager, PipeOpManager pipeOpManager) {
        this.extPipeTypeName = str;
        this.sinkParams = map;
        this.pipeOpManager = pipeOpManager;
        this.extPipePluginManager = extPipePluginManager;
        String timestampPrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
        boolean z = -1;
        switch (timestampPrecision.hashCode()) {
            case 3494:
                if (timestampPrecision.equals("ms")) {
                    z = false;
                    break;
                }
                break;
            case 3525:
                if (timestampPrecision.equals("ns")) {
                    z = 2;
                    break;
                }
                break;
            case 3742:
                if (timestampPrecision.equals("us")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.timestampDivisor = 1;
                return;
            case true:
                this.timestampDivisor = 1000;
                return;
            case true:
                this.timestampDivisor = 1000000;
                return;
            default:
                throw new IllegalArgumentException("Unrecognized time precision: " + timestampPrecision);
        }
    }

    public ExtPipePlugin(String str, IExternalPipeSinkWriterFactory iExternalPipeSinkWriterFactory, ExtPipePluginConfiguration extPipePluginConfiguration, TsFilePipe tsFilePipe) {
    }

    public void setPipeSinkWriterFactory(IExternalPipeSinkWriterFactory iExternalPipeSinkWriterFactory) {
        this.pipeSinkWriterFactory = iExternalPipeSinkWriterFactory;
    }

    private int getIntParam(String str, int i) {
        String str2 = this.sinkParams.get(str);
        return str2 == null ? i : Integer.parseInt(str2);
    }

    public void start() throws IOException {
        logger.debug("ExtPipePlugin start(), extPipeName={}.", this.extPipeTypeName);
        if (this.alive) {
            String str = "Can not re-run alive External pipe: " + this.extPipeTypeName + TsFileConstant.PATH_SEPARATOR;
            logger.error(str);
            throw new IllegalStateException(str);
        }
        int intParam = getIntParam("thread_num", 1);
        int intParam2 = getIntParam("batch_size", 100000);
        try {
            this.configuration = new ExtPipePluginConfiguration.Builder(this.extPipeTypeName).numOfThreads(intParam).operationBatchSize(intParam2).attemptTimes(getIntParam("attempt_times", 3)).backOffInterval(getIntParam("retry_interval", 1000)).build();
            if (this.pipeSinkWriterFactory == null) {
                this.pipeSinkWriterFactory = ExtPipePluginRegister.getInstance().getWriteFactory(this.extPipeTypeName);
            }
            this.pipeSinkWriterFactory.initialize(this.sinkParams);
            this.alive = true;
            logger.info("External pipe {} begin to START", this.extPipeTypeName);
            this.executorService = IoTDBThreadPoolFactory.newFixedThreadPool(intParam, ThreadName.EXT_PIPE_PLUGIN_WORKER.getName() + "-" + this.extPipeTypeName);
            this.dataTransmissionTasks = new ArrayList(intParam);
            for (int i = 0; i < intParam; i++) {
                DataTransmissionTask dataTransmissionTask = new DataTransmissionTask(this.pipeSinkWriterFactory.get(), i, this.configuration);
                this.dataTransmissionTasks.add(dataTransmissionTask);
                this.executorService.submit(dataTransmissionTask);
            }
            this.writerInvocationFailures = new ConcurrentHashMap();
            logger.info("External pipe {} finish START.", this.extPipeTypeName);
        } catch (Exception e) {
            logger.error("Failed to start External Pipe: {}.", this.extPipeTypeName, e);
            throw new IOException("Failed to start External Pipe: " + this.extPipeTypeName + ". " + e.getMessage());
        }
    }

    public void stop() {
        if (!this.alive) {
            String str = "Error: External pipe " + this.extPipeTypeName + " has not started.";
            logger.error(str);
            throw new IllegalStateException(str);
        }
        this.alive = false;
        this.executorService.shutdown();
        boolean z = false;
        try {
            try {
                z = this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
                if (z) {
                    return;
                }
                logger.warn("ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.", this.extPipeTypeName);
                this.executorService.shutdownNow();
            } catch (InterruptedException e) {
                logger.error("Interrupted when waiting for the termination of external pipe, " + this.extPipeTypeName, (Throwable) e);
                if (z) {
                    return;
                }
                logger.warn("ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.", this.extPipeTypeName);
                this.executorService.shutdownNow();
            }
        } catch (Throwable th) {
            if (!z) {
                logger.warn("ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.", this.extPipeTypeName);
                this.executorService.shutdownNow();
            }
            throw th;
        }
    }

    public boolean isAlive() {
        return this.alive;
    }

    public ExternalPipeStatus getStatus() {
        ExternalPipeStatus externalPipeStatus = new ExternalPipeStatus();
        try {
            externalPipeStatus.setWriterStatuses((List) this.dataTransmissionTasks.stream().map((v0) -> {
                return v0.getStatus();
            }).collect(Collectors.toList()));
        } catch (Exception e) {
            handleExceptionsThrownByWriter("getStatus", e);
        }
        externalPipeStatus.setAlive(this.alive);
        externalPipeStatus.setWriterInvocationFailures(this.writerInvocationFailures);
        return externalPipeStatus;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleExceptionsThrownByWriter(String str, Exception exc) {
        this.writerInvocationFailures.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        });
        String message = exc.getMessage();
        if (message == null) {
            message = "N/A";
        }
        this.writerInvocationFailures.get(str).computeIfAbsent(message, str3 -> {
            return new AtomicInteger(0);
        });
        this.writerInvocationFailures.get(str).get(message).incrementAndGet();
        logger.info("Exception thrown from writer", (Throwable) exc);
    }

    public long getDataCommitIndex(String str) {
        return this.dataCommitMap.getOrDefault(str, Long.MIN_VALUE).longValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getThreadIndex(String str) {
        return Math.abs(str.hashCode()) % this.configuration.getNumOfThreads();
    }

    public void notifyNewDataArrive(String str, long j, long j2) {
        logger.debug("notifyNewDataArrive(), sgName={}, newDataBeginIndex={}, newDataCount={}", str, Long.valueOf(j), Long.valueOf(j2));
        DataTransmissionTask dataTransmissionTask = this.dataTransmissionTasks.get(getThreadIndex(str));
        if (dataTransmissionTask != null) {
            dataTransmissionTask.notifyNewDataArrive(str, j, j2);
        }
    }
}
