package com.taosdata.jdbc.tmq;

import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.utils.StringUtils;
import com.taosdata.jdbc.utils.Utils;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/taosdata/jdbc/tmq/TaosConsumer.class */
public class TaosConsumer<V> implements TConsumer<V> {
    private static final long NO_CURRENT_THREAD = -1;
    private Deserializer<V> deserializer;
    long resultSet;
    private OffsetCommitCallback callback;
    private final AtomicLong currentThread = new AtomicLong(-1);
    private final AtomicInteger refcount = new AtomicInteger(0);
    private volatile boolean closed = false;
    List<V> list = new ArrayList();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10), runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("consumer-callback-" + thread.getId());
        return thread;
    }, new ThreadPoolExecutor.CallerRunsPolicy());
    private final TMQConnector connector = new TMQConnector();

    public TaosConsumer(Properties properties) throws SQLException {
        if (null == properties) {
            throw TSDBError.createSQLException(-100, "consumer properties must not be null!");
        }
        String property = properties.getProperty(TMQConstants.BOOTSTRAP_SERVERS);
        if (!StringUtils.isEmpty(property)) {
            Arrays.stream(property.split(",")).filter(str -> {
                return !StringUtils.isEmpty(str);
            }).findFirst().ifPresent(str2 -> {
                String[] split = str2.split(":");
                properties.setProperty(TMQConstants.CONNECT_IP, split[0]);
                properties.setProperty(TMQConstants.CONNECT_PORT, split[1]);
            });
        }
        String property2 = properties.getProperty(TMQConstants.VALUE_DESERIALIZER);
        if (StringUtils.isEmpty(property2)) {
            this.deserializer = new MapDeserializer();
        } else {
            this.deserializer = (Deserializer) Utils.newInstance(Utils.parseClassType(property2));
        }
        this.deserializer.configure(properties);
        long createConfig = this.connector.createConfig(properties);
        try {
            this.connector.createConsumer(createConfig);
            this.connector.destroyConf(createConfig);
        } catch (Throwable th) {
            this.connector.destroyConf(createConfig);
            throw th;
        }
    }

    public void commitCallbackHandler(int i) {
        CallbackResult callbackResult = new CallbackResult(i, this.list);
        if (0 == i) {
            this.executor.submit(() -> {
                this.callback.onComplete(callbackResult, null);
            });
        } else {
            SQLException createSQLException = TSDBError.createSQLException(i, this.connector.getErrMsg(i));
            this.executor.submit(() -> {
                this.callback.onComplete(callbackResult, createSQLException);
            });
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public void subscribe(Collection<String> collection) throws SQLException {
        acquireAndEnsureOpen();
        long j = 0;
        try {
            j = this.connector.createTopic(collection);
            this.connector.subscribe(j);
            if (j != 0) {
                this.connector.destroyTopic(j);
            }
            release();
        } catch (Throwable th) {
            if (j != 0) {
                this.connector.destroyTopic(j);
            }
            release();
            throw th;
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public void unsubscribe() throws SQLException {
        acquireAndEnsureOpen();
        try {
            this.connector.unsubscribe();
        } finally {
            release();
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public Set<String> subscription() throws SQLException {
        acquireAndEnsureOpen();
        try {
            return this.connector.subscription();
        } finally {
            release();
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public ConsumerRecords<V> poll(Duration duration) throws SQLException {
        acquireAndEnsureOpen();
        try {
            this.resultSet = this.connector.poll(duration.toMillis());
            this.list = new ArrayList();
            if (this.resultSet == 0 || this.resultSet == -120) {
                ConsumerRecords<V> empty = ConsumerRecords.empty();
                release();
                return empty;
            }
            int resultTimePrecision = this.connector.getResultTimePrecision(this.resultSet);
            HashMap hashMap = new HashMap();
            Object obj = null;
            TMQResultSet tMQResultSet = new TMQResultSet(this.connector, this.resultSet, resultTimePrecision);
            Throwable th = null;
            while (tMQResultSet.next()) {
                try {
                    try {
                        TopicPartition topicPartition = new TopicPartition(this.connector.getTopicName(this.resultSet), this.connector.getDbName(this.resultSet), this.connector.getVgroupId(this.resultSet), this.connector.getTableName(this.resultSet));
                        if (!topicPartition.equals(obj)) {
                            hashMap.put(obj, this.list);
                            obj = topicPartition;
                            this.list = new ArrayList();
                        }
                        try {
                            this.list.add(this.deserializer.deserialize(tMQResultSet));
                        } catch (Exception e) {
                            throw new DeserializerException("Deserializer error", e);
                        }
                    } finally {
                    }
                } finally {
                }
            }
            if (tMQResultSet != null) {
                if (0 != 0) {
                    try {
                        tMQResultSet.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    tMQResultSet.close();
                }
            }
            hashMap.put(obj, this.list);
            ConsumerRecords<V> consumerRecords = new ConsumerRecords<>(hashMap);
            release();
            return consumerRecords;
        } catch (Throwable th3) {
            release();
            throw th3;
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public void commitAsync() {
        this.connector.asyncCommit(0L, this);
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        this.callback = offsetCommitCallback;
        this.connector.asyncCommit(0L, this);
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer
    public void commitSync() throws SQLException {
        this.connector.syncCommit(0L);
    }

    private void acquireAndEnsureOpen() {
        acquire();
        if (this.closed) {
            release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long id = Thread.currentThread().getId();
        if (id != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, id)) {
            throw new ConcurrentModificationException("Consumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    @Override // com.taosdata.jdbc.tmq.TConsumer, java.lang.AutoCloseable
    public void close() throws SQLException {
        acquire();
        try {
            this.executor.shutdown();
            this.connector.closeConsumer();
        } finally {
            this.closed = true;
            release();
        }
    }
}
