/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.client.factory;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfigUtils;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeBaseSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.corebase.Shutdownable;
import org.apache.inlong.tubemq.corerpc.RpcConfig;
import org.apache.inlong.tubemq.corerpc.client.ClientFactory;
import org.apache.inlong.tubemq.corerpc.netty.NettyClientFactory;

public class TubeSingleSessionFactory
implements MessageSessionFactory {
    private static final NettyClientFactory clientFactory = new NettyClientFactory();
    private static final AtomicBoolean isShutDown = new AtomicBoolean(true);
    private static final AtomicLong referenceCounter = new AtomicLong(0L);
    private static TubeBaseSessionFactory baseSessionFactory;

    public TubeSingleSessionFactory(TubeClientConfig tubeClientConfig) throws TubeClientException {
        if (referenceCounter.incrementAndGet() == 1L) {
            RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, true);
            clientFactory.configure(config);
            baseSessionFactory = new TubeBaseSessionFactory((ClientFactory)clientFactory, tubeClientConfig);
            isShutDown.set(false);
        }
        while (isShutDown.get()) {
            try {
                Thread.sleep(50L);
            }
            catch (Throwable e) {
                break;
            }
        }
    }

    @Override
    public void shutdown() throws TubeClientException {
        if (isShutDown.get()) {
            throw new TubeClientException("Please initialize the object first!");
        }
        if (referenceCounter.decrementAndGet() > 0L) {
            return;
        }
        baseSessionFactory.shutdown();
        clientFactory.shutdown();
        isShutDown.set(true);
    }

    @Override
    public <T extends Shutdownable> void removeClient(T client) {
        if (baseSessionFactory == null) {
            return;
        }
        baseSessionFactory.removeClient(client);
    }

    @Override
    public MessageProducer createProducer() throws TubeClientException {
        if (isShutDown.get()) {
            throw new TubeClientException("Please initialize the object first!");
        }
        return baseSessionFactory.createProducer();
    }

    @Override
    public PushMessageConsumer createPushConsumer(ConsumerConfig consumerConfig) throws TubeClientException {
        if (isShutDown.get()) {
            throw new TubeClientException("Please initialize the object first!");
        }
        return baseSessionFactory.createPushConsumer(consumerConfig);
    }

    @Override
    public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig) throws TubeClientException {
        if (isShutDown.get()) {
            throw new TubeClientException("Please initialize the object first!");
        }
        return baseSessionFactory.createPullConsumer(consumerConfig);
    }

    public NettyClientFactory getRpcServiceFactory() {
        return clientFactory;
    }
}

