package org.apache.flink.mesos.runtime.clusterframework.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

@Deprecated
/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.class */
public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class);
    private final Object startStopLock = new Object();
    private boolean isRunning;
    private final ZooKeeperSharedValue frameworkIdInZooKeeper;
    private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
    private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;

    public ZooKeeperMesosWorkerStore(ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore, ZooKeeperSharedValue zooKeeperSharedValue, ZooKeeperSharedCount zooKeeperSharedCount) throws Exception {
        this.workersInZooKeeper = (ZooKeeperStateHandleStore) Preconditions.checkNotNull(zooKeeperStateHandleStore, "workersInZooKeeper");
        this.frameworkIdInZooKeeper = (ZooKeeperSharedValue) Preconditions.checkNotNull(zooKeeperSharedValue, "frameworkIdInZooKeeper");
        this.totalTaskCountInZooKeeper = (ZooKeeperSharedCount) Preconditions.checkNotNull(zooKeeperSharedCount, "totalTaskCountInZooKeeper");
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public void start() throws Exception {
        synchronized (this.startStopLock) {
            if (!this.isRunning) {
                this.isRunning = true;
                this.frameworkIdInZooKeeper.start();
                this.totalTaskCountInZooKeeper.start();
            }
        }
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public void stop(boolean z) throws Exception {
        synchronized (this.startStopLock) {
            if (this.isRunning) {
                this.frameworkIdInZooKeeper.close();
                this.totalTaskCountInZooKeeper.close();
                if (z) {
                    this.workersInZooKeeper.releaseAndTryRemoveAll();
                }
                this.isRunning = false;
            }
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState(this.isRunning, "Not running. Forgot to call start()?");
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public Option<Protos.FrameworkID> getFrameworkID() throws Exception {
        Option<Protos.FrameworkID> empty;
        synchronized (this.startStopLock) {
            verifyIsRunning();
            byte[] value = this.frameworkIdInZooKeeper.getValue();
            empty = value.length == 0 ? Option.empty() : Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value, ConfigConstants.DEFAULT_CHARSET)).build());
        }
        return empty;
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public void setFrameworkID(Option<Protos.FrameworkID> option) throws Exception {
        synchronized (this.startStopLock) {
            verifyIsRunning();
            this.frameworkIdInZooKeeper.setValue(option.isDefined() ? ((Protos.FrameworkID) option.get()).getValue().getBytes(ConfigConstants.DEFAULT_CHARSET) : new byte[0]);
        }
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public Protos.TaskID newTaskID() throws Exception {
        ZooKeeperVersionedValue versionedValue;
        int intValue;
        Protos.TaskID build;
        synchronized (this.startStopLock) {
            verifyIsRunning();
            do {
                versionedValue = this.totalTaskCountInZooKeeper.getVersionedValue();
                intValue = ((Integer) versionedValue.getValue()).intValue() + 1;
            } while (!this.totalTaskCountInZooKeeper.trySetCount(versionedValue, intValue));
            build = Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(intValue)).build();
        }
        return build;
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public List<MesosWorkerStore.Worker> recoverWorkers() throws Exception {
        synchronized (this.startStopLock) {
            verifyIsRunning();
            List<Tuple2> allAndLock = this.workersInZooKeeper.getAllAndLock();
            if (allAndLock.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList(allAndLock.size());
            for (Tuple2 tuple2 : allAndLock) {
                try {
                    arrayList.add((MesosWorkerStore.Worker) ((RetrievableStateHandle) tuple2.f0).retrieveState());
                } catch (IOException e) {
                    throw new FlinkException("Could not retrieve Mesos worker from state handle under " + ((String) tuple2.f1) + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", e);
                } catch (ClassNotFoundException e2) {
                    throw new FlinkException("Could not retrieve Mesos worker from state handle under " + ((String) tuple2.f1) + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", e2);
                }
            }
            return arrayList;
        }
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public void putWorker(MesosWorkerStore.Worker worker) throws Exception {
        Preconditions.checkNotNull(worker, "worker");
        String pathForWorker = getPathForWorker(worker.taskID());
        synchronized (this.startStopLock) {
            verifyIsRunning();
            IntegerResourceVersion exists = this.workersInZooKeeper.exists(pathForWorker);
            if (exists.isExisting()) {
                this.workersInZooKeeper.replace(pathForWorker, exists, worker);
                LOG.debug("Updated {} in ZooKeeper.", worker);
            } else {
                this.workersInZooKeeper.addAndLock(pathForWorker, worker);
                LOG.debug("Added {} in ZooKeeper.", worker);
            }
        }
    }

    @Override // org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
    public boolean removeWorker(Protos.TaskID taskID) throws Exception {
        Preconditions.checkNotNull(taskID, "taskID");
        String pathForWorker = getPathForWorker(taskID);
        synchronized (this.startStopLock) {
            verifyIsRunning();
            if (!this.workersInZooKeeper.exists(pathForWorker).isExisting()) {
                LOG.debug("No such worker {} in ZooKeeper.", taskID);
                return false;
            }
            this.workersInZooKeeper.releaseAndTryRemove(pathForWorker);
            LOG.debug("Removed worker {} from ZooKeeper.", taskID);
            return true;
        }
    }

    private static String getPathForWorker(Protos.TaskID taskID) {
        Preconditions.checkNotNull(taskID, "taskID");
        return String.format("/%s", taskID.getValue());
    }
}
