package org.apache.helix.manager.zk;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixConnection;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixParticipant;
import org.apache.helix.HelixTimerTask;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/helix/manager/zk/ZkHelixParticipant.class */
public class ZkHelixParticipant implements HelixParticipant {
    private static Logger LOG = Logger.getLogger(ZkHelixParticipant.class);
    final ZkHelixConnection _connection;
    final ClusterId _clusterId;
    final ParticipantId _participantId;
    final ZKHelixDataAccessor _accessor;
    final BaseDataAccessor<ZNRecord> _baseAccessor;
    final PropertyKey.Builder _keyBuilder;
    final ConfigAccessor _configAccessor;
    final ClusterAccessor _clusterAccessor;
    final DefaultMessagingService _messagingService;
    boolean _isStarted;
    LiveInstanceInfoProvider _liveInstanceInfoProvider;
    final StateMachineEngine _stateMachineEngine = new HelixStateMachineEngine(new ZKHelixManager(this));
    final List<PreConnectCallback> _preConnectCallbacks = new ArrayList();
    final List<HelixTimerTask> _timerTasks = new ArrayList();

    public ZkHelixParticipant(ZkHelixConnection zkHelixConnection, ClusterId clusterId, ParticipantId participantId) {
        this._connection = zkHelixConnection;
        this._accessor = (ZKHelixDataAccessor) zkHelixConnection.createDataAccessor(clusterId);
        this._baseAccessor = this._accessor.getBaseDataAccessor();
        this._keyBuilder = this._accessor.keyBuilder();
        this._clusterAccessor = zkHelixConnection.createClusterAccessor(clusterId);
        this._configAccessor = zkHelixConnection.getConfigAccessor();
        this._clusterId = clusterId;
        this._participantId = participantId;
        this._messagingService = (DefaultMessagingService) zkHelixConnection.createMessagingService(this);
    }

    @Override // org.apache.helix.HelixRole
    public ClusterId getClusterId() {
        return this._clusterId;
    }

    @Override // org.apache.helix.HelixParticipant
    public ParticipantId getParticipantId() {
        return this._participantId;
    }

    @Override // org.apache.helix.HelixRole
    public HelixConnection getConnection() {
        return this._connection;
    }

    void startTimerTasks() {
        Iterator<HelixTimerTask> it = this._timerTasks.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    void stopTimerTasks() {
        Iterator<HelixTimerTask> it = this._timerTasks.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reset() {
        stopTimerTasks();
        this._connection.resetHandlers(this);
        this._accessor.getBaseDataAccessor().reset();
    }

    private void createLiveInstance() {
        boolean z;
        String path = this._keyBuilder.liveInstance(this._participantId.stringify()).getPath();
        String stringify = this._connection.getSessionId().stringify();
        LiveInstance liveInstance = new LiveInstance(this._participantId.stringify());
        liveInstance.setSessionId(stringify);
        liveInstance.setHelixVersion(this._connection.getHelixVersion());
        liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
        if (this._liveInstanceInfoProvider != null) {
            LOG.info("Additional live instance information is provided: " + this._liveInstanceInfoProvider);
            ZNRecord additionalLiveInstanceInfo = this._liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
            if (additionalLiveInstanceInfo != null) {
                additionalLiveInstanceInfo.merge(liveInstance.getRecord());
                liveInstance = new LiveInstance(new ZNRecord(additionalLiveInstanceInfo, this._participantId.stringify()));
                LOG.info("Participant: " + this._participantId + ", mergedLiveInstance: " + liveInstance);
            }
        }
        while (true) {
            z = false;
            if (!this._baseAccessor.create(path, liveInstance.getRecord(), AccessOption.EPHEMERAL)) {
                LOG.warn("found another participant with same name: " + this._participantId + " in cluster " + this._clusterId);
                Stat stat = new Stat();
                ZNRecord zNRecord = this._baseAccessor.get(path, stat, 0);
                if (zNRecord != null) {
                    String hexString = Long.toHexString(stat.getEphemeralOwner());
                    if (!hexString.equals(stringify)) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(this._connection.getSessionTimeout() + 5000);
                        } catch (InterruptedException e) {
                            LOG.warn("Sleep interrupted while waiting for previous live-instance to go away", e);
                        }
                        z = true;
                        break;
                    }
                    LiveInstance liveInstance2 = new LiveInstance(zNRecord);
                    if (!liveInstance2.getSessionId().equals(stringify)) {
                        LOG.info("overwriting session-id by ephemeralOwner: " + hexString + ", old-sessionId: " + liveInstance2.getSessionId() + ", new-sessionId: " + stringify);
                        liveInstance2.setSessionId(stringify);
                        if (!this._baseAccessor.set(path, liveInstance2.getRecord(), stat.getVersion(), AccessOption.EPHEMERAL)) {
                            LOG.error("Someone changes sessionId as we update, should not happen");
                            throw new HelixException("fail to create live-instance for " + this._participantId);
                        }
                    }
                } else {
                    z = true;
                }
            }
            if (!z) {
                break;
            }
        }
        if (!z || this._baseAccessor.create(path, liveInstance.getRecord(), AccessOption.EPHEMERAL)) {
            return;
        }
        LOG.error("instance: " + this._participantId + " already has a live-instance in cluster " + this._clusterId);
        throw new HelixException("fail to create live-instance for " + this._participantId);
    }

    private void carryOverPreviousCurrentState() {
        String stringify = this._connection.getSessionId().stringify();
        String stringify2 = this._participantId.stringify();
        List<String> childNames = this._accessor.getChildNames(this._keyBuilder.sessions(stringify2));
        for (String str : childNames) {
            if (!str.equals(stringify)) {
                for (CurrentState currentState : this._accessor.getChildValues(this._keyBuilder.currentStates(stringify2, str))) {
                    LOG.info("Carrying over old session: " + str + ", resource: " + currentState.getId() + " to current session: " + stringify);
                    String stateModelDefRef = currentState.getStateModelDefRef();
                    if (stateModelDefRef == null) {
                        LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " + currentState);
                    } else {
                        this._accessor.getBaseDataAccessor().update(this._keyBuilder.currentState(stringify2, stringify, currentState.getResourceName()).getPath(), new CurStateCarryOverUpdater(stringify, ((StateModelDefinition) this._accessor.getProperty(this._keyBuilder.stateModelDef(stateModelDefRef))).getInitialState(), currentState), AccessOption.PERSISTENT);
                    }
                }
            }
        }
        for (String str2 : childNames) {
            if (!str2.equals(stringify)) {
                PropertyKey currentStates = this._keyBuilder.currentStates(stringify2, str2);
                LOG.info("Removing current states from previous sessions. path: " + currentStates.getPath());
                this._accessor.removeProperty(currentStates);
            }
        }
    }

    private void joinCluster() {
        boolean z = false;
        try {
            z = Boolean.parseBoolean(this._configAccessor.get(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._clusterId.stringify()).build(), "allowParticipantAutoJoin"));
            LOG.info("instance: " + this._participantId + " auto-joining " + this._clusterId + " is " + z);
        } catch (Exception e) {
        }
        if (ZKUtil.isInstanceSetup(this._connection._zkclient, this._clusterId.toString(), this._participantId.toString(), getType())) {
            return;
        }
        if (!z) {
            throw new HelixException("Initial cluster structure is not set up for instance: " + this._participantId + ", instanceType: " + getType());
        }
        LOG.info(this._participantId + " is auto-joining cluster: " + this._clusterId);
        String stringify = this._participantId.stringify();
        String str = stringify;
        int i = -1;
        int lastIndexOf = stringify.lastIndexOf("_");
        if (lastIndexOf > 0) {
            str = stringify.substring(0, lastIndexOf);
            try {
                i = Integer.parseInt(stringify.substring(lastIndexOf + 1));
            } catch (Exception e2) {
            }
        }
        this._clusterAccessor.addParticipant(new ParticipantConfig.Builder(this._participantId).hostName(str).port(i).enabled(true).build());
    }

    private void setupMsgHandler() {
        this._messagingService.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.toString(), this._stateMachineEngine);
        this._connection.addMessageListener(this, this._messagingService.getExecutor(), this._clusterId, this._participantId);
        this._stateMachineEngine.registerStateModelFactory(StateModelDefId.SchedulerTaskQueue, new ScheduledTaskStateModelFactory(this._messagingService.getExecutor()));
        this._messagingService.onConnected();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init() {
        if (!ZKUtil.isClusterSetup(this._clusterId.toString(), this._connection._zkclient)) {
            throw new HelixException("Cluster structure is not set up for cluster: " + this._clusterId);
        }
        joinCluster();
        Iterator<PreConnectCallback> it = this._preConnectCallbacks.iterator();
        while (it.hasNext()) {
            it.next().onPreConnect();
        }
        createLiveInstance();
        carryOverPreviousCurrentState();
        setupMsgHandler();
        startTimerTasks();
        this._connection.initHandlers(this);
    }

    @Override // org.apache.helix.HelixConnectionStateListener
    public void onConnected() {
        reset();
        init();
        this._isStarted = true;
    }

    @Override // org.apache.helix.HelixConnectionStateListener
    public void onDisconnecting() {
        LOG.info("disconnecting " + this._participantId + "(" + getType() + ") from " + this._clusterId);
        reset();
        this._messagingService.getExecutor().shutdown();
        this._accessor.removeProperty(this._keyBuilder.liveInstance(this._participantId.stringify()));
        this._isStarted = false;
    }

    @Override // org.apache.helix.HelixService
    public void start() {
        this._connection.addConnectionStateListener(this);
        if (this._connection.isConnected()) {
            onConnected();
        }
    }

    @Override // org.apache.helix.HelixService
    public void stop() {
        this._connection.removeConnectionStateListener(this);
        onDisconnecting();
    }

    @Override // org.apache.helix.HelixService
    public boolean isStarted() {
        return this._isStarted;
    }

    @Override // org.apache.helix.HelixRole
    public ClusterMessagingService getMessagingService() {
        return this._messagingService;
    }

    @Override // org.apache.helix.HelixParticipant
    public StateMachineEngine getStateMachineEngine() {
        return this._stateMachineEngine;
    }

    @Override // org.apache.helix.HelixRole
    public Id getId() {
        return getParticipantId();
    }

    @Override // org.apache.helix.HelixRole
    public InstanceType getType() {
        return InstanceType.PARTICIPANT;
    }

    @Override // org.apache.helix.HelixParticipant
    public void addPreConnectCallback(PreConnectCallback preConnectCallback) {
        LOG.info("Adding preconnect callback: " + preConnectCallback);
        this._preConnectCallbacks.add(preConnectCallback);
    }

    @Override // org.apache.helix.HelixParticipant
    public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
        this._liveInstanceInfoProvider = liveInstanceInfoProvider;
    }

    @Override // org.apache.helix.HelixRole
    public HelixDataAccessor getAccessor() {
        return this._accessor;
    }

    public ClusterAccessor getClusterAccessor() {
        return this._clusterAccessor;
    }
}
