package org.apache.iotdb.session.subscription;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/SubscriptionConsumer.class */
public abstract class SubscriptionConsumer implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SubscriptionConsumer.class);
    private static final IoTDBConnectionException NO_PROVIDERS_EXCEPTION = new IoTDBConnectionException("Cluster has no available subscription providers to connect");
    private final List<TEndPoint> initialEndpoints;
    private final String username;
    private final String password;
    private String consumerId;
    private String consumerGroupId;
    private final long heartbeatIntervalMs;
    private final long endpointsSyncIntervalMs;
    private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders;
    private final ReentrantReadWriteLock subscriptionProvidersLock;
    private ScheduledExecutorService heartbeatWorkerExecutor;
    private ScheduledExecutorService endpointsSyncerExecutor;
    private final AtomicBoolean isClosed;

    /* loaded from: input_file:org/apache/iotdb/session/subscription/SubscriptionConsumer$Builder.class */
    public static abstract class Builder {
        protected String consumerId;
        protected String consumerGroupId;
        protected String host = "localhost";
        protected int port = SessionConfig.DEFAULT_PORT;
        protected List<String> nodeUrls = null;
        protected String username = "root";
        protected String password = "root";
        protected long heartbeatIntervalMs = 5000;
        protected long endpointsSyncIntervalMs = 30000;

        public Builder host(String str) {
            this.host = str;
            return this;
        }

        public Builder port(int i) {
            this.port = i;
            return this;
        }

        public Builder nodeUrls(List<String> list) {
            this.nodeUrls = list;
            return this;
        }

        public Builder username(String str) {
            this.username = str;
            return this;
        }

        public Builder password(String str) {
            this.password = str;
            return this;
        }

        public Builder consumerId(String str) {
            this.consumerId = str;
            return this;
        }

        public Builder consumerGroupId(String str) {
            this.consumerGroupId = str;
            return this;
        }

        public Builder heartbeatIntervalMs(long j) {
            this.heartbeatIntervalMs = Math.max(j, 1000L);
            return this;
        }

        public Builder endpointsSyncIntervalMs(long j) {
            this.endpointsSyncIntervalMs = Math.max(j, 5000L);
            return this;
        }

        public abstract SubscriptionPullConsumer buildPullConsumer();

        public abstract SubscriptionPushConsumer buildPushConsumer();
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscriptionConsumer(Builder builder) {
        this.subscriptionProviders = new ConcurrentSkipListMap();
        this.subscriptionProvidersLock = new ReentrantReadWriteLock(true);
        this.isClosed = new AtomicBoolean(true);
        this.initialEndpoints = new ArrayList();
        if (Objects.nonNull(builder.host)) {
            this.initialEndpoints.add(new TEndPoint(builder.host, builder.port));
        } else {
            this.initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
        }
        this.username = builder.username;
        this.password = builder.password;
        this.consumerId = builder.consumerId;
        this.consumerGroupId = builder.consumerGroupId;
        this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
        this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SubscriptionConsumer(Builder builder, Properties properties) {
        this(builder.host((String) properties.getOrDefault(ConsumerConstant.HOST_KEY, "localhost")).port(((Integer) properties.getOrDefault("port", Integer.valueOf(SessionConfig.DEFAULT_PORT))).intValue()).nodeUrls((List) properties.get(ConsumerConstant.NODE_URLS_KEY)).username((String) properties.getOrDefault(ConsumerConstant.USERNAME_KEY, "root")).password((String) properties.getOrDefault(ConsumerConstant.PASSWORD_KEY, "root")).consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)).consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)).heartbeatIntervalMs(((Long) properties.getOrDefault(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 5000L)).longValue()).endpointsSyncIntervalMs(((Long) properties.getOrDefault(ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_KEY, 30000L)).longValue()));
    }

    public synchronized void open() throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        if (this.isClosed.get()) {
            acquireWriteLock();
            try {
                openProviders();
                launchHeartbeatWorker();
                launchEndpointsSyncer();
                this.isClosed.set(false);
            } finally {
                releaseWriteLock();
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() throws IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            shutdownEndpointsSyncer();
            shutdownHeartbeatWorker();
            acquireWriteLock();
            try {
                closeProviders();
                releaseWriteLock();
            } catch (Throwable th) {
                releaseWriteLock();
                throw th;
            }
        } finally {
            this.isClosed.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.isClosed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acquireReadLock() {
        this.subscriptionProvidersLock.readLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseReadLock() {
        this.subscriptionProvidersLock.readLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acquireWriteLock() {
        this.subscriptionProvidersLock.writeLock().lock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseWriteLock() {
        this.subscriptionProvidersLock.writeLock().unlock();
    }

    public void subscribe(String str) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        subscribe(Collections.singleton(str));
    }

    public void subscribe(String... strArr) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        subscribe(new HashSet(Arrays.asList(strArr)));
    }

    public void subscribe(Set<String> set) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        acquireReadLock();
        try {
            subscribeWithRedirection(set);
        } finally {
            releaseReadLock();
        }
    }

    public void unsubscribe(String str) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        unsubscribe(Collections.singleton(str));
    }

    public void unsubscribe(String... strArr) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        unsubscribe(new HashSet(Arrays.asList(strArr)));
    }

    public void unsubscribe(Set<String> set) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        acquireReadLock();
        try {
            unsubscribeWithRedirection(set);
        } finally {
            releaseReadLock();
        }
    }

    private void launchHeartbeatWorker() {
        this.heartbeatWorkerExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable, "ConsumerHeartbeatWorker", 0L);
            if (!thread.isDaemon()) {
                thread.setDaemon(true);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        });
        this.heartbeatWorkerExecutor.scheduleAtFixedRate(new ConsumerHeartbeatWorker(this), 0L, this.heartbeatIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownHeartbeatWorker() {
        this.heartbeatWorkerExecutor.shutdown();
        this.heartbeatWorkerExecutor = null;
    }

    private void launchEndpointsSyncer() {
        this.endpointsSyncerExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable, "SubscriptionEndpointsSyncer", 0L);
            if (!thread.isDaemon()) {
                thread.setDaemon(true);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        });
        this.endpointsSyncerExecutor.scheduleAtFixedRate(new SubscriptionEndpointsSyncer(this), 0L, this.endpointsSyncIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownEndpointsSyncer() {
        this.endpointsSyncerExecutor.shutdown();
        this.endpointsSyncerExecutor = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionProvider constructProviderAndHandshake(TEndPoint tEndPoint) throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        SubscriptionProvider subscriptionProvider = new SubscriptionProvider(tEndPoint, this.username, this.password, this.consumerId, this.consumerGroupId);
        subscriptionProvider.handshake();
        if (Objects.isNull(this.consumerId)) {
            this.consumerId = subscriptionProvider.getConsumerId();
        }
        if (Objects.isNull(this.consumerGroupId)) {
            this.consumerGroupId = subscriptionProvider.getConsumerGroupId();
        }
        return subscriptionProvider;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openProviders() throws IoTDBConnectionException {
        closeProviders();
        Iterator<TEndPoint> it = this.initialEndpoints.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TEndPoint next = it.next();
            try {
                SubscriptionProvider constructProviderAndHandshake = constructProviderAndHandshake(next);
                int dataNodeId = constructProviderAndHandshake.getDataNodeId();
                addProvider(dataNodeId, constructProviderAndHandshake);
                try {
                    for (Map.Entry<Integer, TEndPoint> entry : constructProviderAndHandshake.getSessionConnection().fetchAllEndPoints().entrySet()) {
                        if (dataNodeId != entry.getKey().intValue()) {
                            try {
                                addProvider(entry.getKey().intValue(), constructProviderAndHandshake(entry.getValue()));
                            } catch (Exception e) {
                                LOGGER.warn("Failed to create connection with {}, exception: {}, will retry later...", entry.getValue(), e.getMessage());
                            }
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.warn("Failed to fetch all endpoints from {}, exception: {}, will retry later...", next, e2.getMessage());
                }
            } catch (Exception e3) {
                LOGGER.warn("Failed to create connection with {}, exception: {}", next, e3.getMessage());
            }
        }
        if (hasNoProviders()) {
            throw NO_PROVIDERS_EXCEPTION;
        }
    }

    private void closeProviders() throws IoTDBConnectionException {
        Iterator<SubscriptionProvider> it = getAllProviders().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.subscriptionProviders.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addProvider(int i, SubscriptionProvider subscriptionProvider) {
        LOGGER.info("add new subscription provider {}", subscriptionProvider);
        this.subscriptionProviders.put(Integer.valueOf(i), subscriptionProvider);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAndRemoveProvider(int i) throws IoTDBConnectionException {
        if (containsProvider(i)) {
            SubscriptionProvider subscriptionProvider = this.subscriptionProviders.get(Integer.valueOf(i));
            try {
                subscriptionProvider.close();
                LOGGER.info("close and remove stale subscription provider {}", subscriptionProvider);
                this.subscriptionProviders.remove(Integer.valueOf(i));
            } catch (Throwable th) {
                LOGGER.info("close and remove stale subscription provider {}", subscriptionProvider);
                this.subscriptionProviders.remove(Integer.valueOf(i));
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasNoProviders() {
        return this.subscriptionProviders.isEmpty();
    }

    boolean containsProvider(int i) {
        return this.subscriptionProviders.containsKey(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionProvider> getAllAvailableProviders() {
        return (List) this.subscriptionProviders.values().stream().filter((v0) -> {
            return v0.isAvailable();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionProvider> getAllProviders() {
        return new ArrayList(this.subscriptionProviders.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionProvider getProvider(int i) {
        if (containsProvider(i)) {
            return this.subscriptionProviders.get(Integer.valueOf(i));
        }
        return null;
    }

    public void subscribeWithRedirection(Set<String> set) throws IoTDBConnectionException {
        for (SubscriptionProvider subscriptionProvider : getAllAvailableProviders()) {
            try {
                subscriptionProvider.getSessionConnection().subscribe(set);
                return;
            } catch (Exception e) {
                LOGGER.warn("Failed to subscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...", set, subscriptionProvider, e.getMessage());
            }
        }
        throw NO_PROVIDERS_EXCEPTION;
    }

    public void unsubscribeWithRedirection(Set<String> set) throws IoTDBConnectionException {
        for (SubscriptionProvider subscriptionProvider : getAllAvailableProviders()) {
            try {
                subscriptionProvider.getSessionConnection().unsubscribe(set);
                return;
            } catch (Exception e) {
                LOGGER.warn("Failed to unsubscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...", set, subscriptionProvider, e.getMessage());
            }
        }
        throw NO_PROVIDERS_EXCEPTION;
    }

    public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws IoTDBConnectionException {
        Map<Integer, TEndPoint> map = null;
        for (SubscriptionProvider subscriptionProvider : getAllAvailableProviders()) {
            try {
                map = subscriptionProvider.getSessionConnection().fetchAllEndPoints();
                break;
            } catch (Exception e) {
                LOGGER.warn("Failed to fetch all endpoints from subscription provider {}, exception: {}, try next subscription provider...", subscriptionProvider, e.getMessage());
            }
        }
        if (Objects.isNull(map)) {
            throw NO_PROVIDERS_EXCEPTION;
        }
        return map;
    }
}
