package org.apache.iotdb.commons.pipe.connector.protocol;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.class */
public abstract class IoTDBConnector implements PipeConnector {
    private static final String PARSE_URL_ERROR_FORMATTER = "Exception occurred while parsing node urls from target servers: {}";
    private static final String PARSE_URL_ERROR_MESSAGE = "Error occurred while parsing node urls from target servers, please check the specified 'host':'port' or 'node-urls'";
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConnector.class);
    protected String loadBalanceStrategy;
    protected PipeReceiverStatusHandler receiverStatusHandler;
    protected final List<TEndPoint> nodeUrls = new ArrayList();
    protected boolean isTabletBatchModeEnabled = true;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        PipeParameters parameters = pipeParameterValidator.getParameters();
        pipeParameterValidator.validate(objArr -> {
            return ((Boolean) objArr[0]).booleanValue() || ((((Boolean) objArr[1]).booleanValue() || ((Boolean) objArr[2]).booleanValue()) && ((Boolean) objArr[3]).booleanValue()) || ((Boolean) objArr[4]).booleanValue() || ((((Boolean) objArr[5]).booleanValue() || ((Boolean) objArr[6]).booleanValue()) && ((Boolean) objArr[7]).booleanValue());
        }, String.format("One of %s, %s:%s, %s, %s:%s must be specified", PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY, PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY, PipeConnectorConstant.SINK_IOTDB_HOST_KEY, PipeConnectorConstant.SINK_IOTDB_PORT_KEY), new Object[]{Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_IP_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_HOST_KEY)), Boolean.valueOf(parameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_PORT_KEY))});
        this.loadBalanceStrategy = parameters.getStringOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY), PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY).trim().toLowerCase();
        pipeParameterValidator.validate(obj -> {
            return PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(this.loadBalanceStrategy);
        }, String.format("Load balance strategy should be one of %s, but got %s.", PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET, this.loadBalanceStrategy), this.loadBalanceStrategy);
        pipeParameterValidator.validate(obj2 -> {
            return obj2.equals(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE) || obj2.equals("ignore");
        }, String.format("The value of key %s or %s must be either 'retry' or 'ignore'.", PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY, PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY), parameters.getStringOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY, PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY), PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE).trim().toLowerCase());
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        this.nodeUrls.clear();
        this.nodeUrls.addAll(parseNodeUrls(pipeParameters));
        LOGGER.info("IoTDBConnector nodeUrls: {}", this.nodeUrls);
        this.isTabletBatchModeEnabled = pipeParameters.getBooleanOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, PipeConnectorConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), true);
        LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", Boolean.valueOf(this.isTabletBatchModeEnabled));
        this.receiverStatusHandler = new PipeReceiverStatusHandler(pipeParameters.getStringOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY, PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY), PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE).trim().equalsIgnoreCase(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE), pipeParameters.getLongOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY, PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY), 60L), pipeParameters.getBooleanOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY, PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY), true), pipeParameters.getLongOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY, PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY), -1L), pipeParameters.getBooleanOrDefault(Arrays.asList(PipeConnectorConstant.CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY, PipeConnectorConstant.SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY), true));
    }

    protected LinkedHashSet<TEndPoint> parseNodeUrls(PipeParameters pipeParameters) throws PipeParameterNotValidException {
        LinkedHashSet<TEndPoint> linkedHashSet = new LinkedHashSet<>(this.nodeUrls);
        try {
            if (pipeParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY) && pipeParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY)) {
                linkedHashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY}), pipeParameters.getIntByKeys(new String[]{PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY}).intValue()));
            }
            if (pipeParameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_IP_KEY) && pipeParameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_PORT_KEY)) {
                linkedHashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.SINK_IOTDB_IP_KEY}), pipeParameters.getIntByKeys(new String[]{PipeConnectorConstant.SINK_IOTDB_PORT_KEY}).intValue()));
            }
            if (pipeParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY) && pipeParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY)) {
                linkedHashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.CONNECTOR_IOTDB_HOST_KEY}), pipeParameters.getIntByKeys(new String[]{PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY}).intValue()));
            }
            if (pipeParameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_HOST_KEY) && pipeParameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_PORT_KEY)) {
                linkedHashSet.add(new TEndPoint(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.SINK_IOTDB_HOST_KEY}), pipeParameters.getIntByKeys(new String[]{PipeConnectorConstant.SINK_IOTDB_PORT_KEY}).intValue()));
            }
            if (pipeParameters.hasAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)) {
                linkedHashSet.addAll(NodeUrlUtils.parseTEndPointUrls((List<String>) Arrays.asList(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY}).split(","))));
            }
            if (pipeParameters.hasAttribute(PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY)) {
                linkedHashSet.addAll(NodeUrlUtils.parseTEndPointUrls((List<String>) Arrays.asList(pipeParameters.getStringByKeys(new String[]{PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY}).split(","))));
            }
            checkNodeUrls(linkedHashSet);
            return linkedHashSet;
        } catch (Exception e) {
            LOGGER.warn(PARSE_URL_ERROR_FORMATTER, e.toString());
            throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
        }
    }

    private void checkNodeUrls(Set<TEndPoint> set) throws PipeParameterNotValidException {
        for (TEndPoint tEndPoint : set) {
            if (Objects.isNull(tEndPoint.ip) || tEndPoint.ip.isEmpty()) {
                LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "host cannot be empty");
                throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
            }
            if (tEndPoint.port == 0) {
                LOGGER.warn(PARSE_URL_ERROR_FORMATTER, "port cannot be empty");
                throw new PipeParameterNotValidException(PARSE_URL_ERROR_MESSAGE);
            }
        }
    }

    public PipeReceiverStatusHandler statusHandler() {
        return this.receiverStatusHandler;
    }
}
