package co.cask.cdap.route.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.io.Codec;
import co.cask.cdap.common.service.ServiceDiscoverable;
import co.cask.cdap.common.zookeeper.ZKExtOperations;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/route/store/ZKRouteStore.class */
public class ZKRouteStore implements RouteStore {
    private static final int ZK_TIMEOUT_SECS = 5;
    private final ZKClient zkClient;
    private final ConcurrentMap<ProgramId, SettableFuture<RouteConfig>> routeConfigMap = Maps.newConcurrentMap();
    private static final Logger LOG = LoggerFactory.getLogger(ZKRouteStore.class);
    private static final Gson GSON = new Gson();
    private static final Codec<RouteConfig> ROUTE_CONFIG_CODEC = new Codec<RouteConfig>() { // from class: co.cask.cdap.route.store.ZKRouteStore.1
        public byte[] encode(RouteConfig routeConfig) throws IOException {
            return Bytes.toBytes(ZKRouteStore.GSON.toJson(routeConfig));
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public RouteConfig m321decode(byte[] bArr) throws IOException {
            return (RouteConfig) ZKRouteStore.GSON.fromJson(Bytes.toString(bArr), RouteConfig.class);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/route/store/ZKRouteStore$ZKRouteWatcher.class */
    public class ZKRouteWatcher implements Watcher {
        private final ProgramId serviceId;

        ZKRouteWatcher(ProgramId programId) {
            this.serviceId = programId;
        }

        public void process(WatchedEvent watchedEvent) {
            SettableFuture settableFuture = (SettableFuture) ZKRouteStore.this.routeConfigMap.get(this.serviceId);
            if (settableFuture == null) {
                return;
            }
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                ZKRouteStore.this.routeConfigMap.remove(this.serviceId, settableFuture);
            } else {
                ZKRouteStore.this.getAndWatchData(this.serviceId, SettableFuture.create(), settableFuture, this);
            }
        }
    }

    @Inject
    public ZKRouteStore(ZKClient zKClient) {
        this.zkClient = zKClient;
    }

    @Override // co.cask.cdap.route.store.RouteStore
    public void store(ProgramId programId, RouteConfig routeConfig) {
        Supplier ofInstance = Suppliers.ofInstance(routeConfig);
        SettableFuture<RouteConfig> settableFuture = this.routeConfigMap.get(programId);
        try {
            ZKExtOperations.createOrSet(this.zkClient, getZKPath(programId), ofInstance, ROUTE_CONFIG_CODEC, 10).get(5L, TimeUnit.SECONDS);
            SettableFuture<RouteConfig> create = SettableFuture.create();
            create.set(routeConfig);
            if (settableFuture != null) {
                this.routeConfigMap.replace(programId, settableFuture, create);
            } else {
                this.routeConfigMap.putIfAbsent(programId, create);
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.route.store.RouteStore
    public void delete(ProgramId programId) throws NotFoundException {
        OperationFuture delete = this.zkClient.delete(getZKPath(programId));
        SettableFuture<RouteConfig> settableFuture = this.routeConfigMap.get(programId);
        try {
            delete.get(5L, TimeUnit.SECONDS);
            this.routeConfigMap.remove(programId, settableFuture);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            if (!(e.getCause() instanceof KeeperException.NoNodeException)) {
                throw Throwables.propagate(e);
            }
            throw new NotFoundException(String.format("Route Config for Service %s was not found.", programId));
        }
    }

    @Override // co.cask.cdap.route.store.RouteStore
    public RouteConfig fetch(ProgramId programId) {
        Future<RouteConfig> future = (Future) this.routeConfigMap.get(programId);
        if (future == null) {
            SettableFuture<RouteConfig> create = SettableFuture.create();
            future = (Future) this.routeConfigMap.putIfAbsent(programId, create);
            if (future == null) {
                future = getAndWatchData(programId, create, create, new ZKRouteWatcher(programId));
            }
        }
        return getConfig(programId, future);
    }

    private RouteConfig getConfig(ProgramId programId, Future<RouteConfig> future) {
        try {
            return future.get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.debug("Getting configuration for service {} from ZK failed.", programId, e);
            return new RouteConfig(Collections.emptyMap());
        }
    }

    private static String getZKPath(ProgramId programId) {
        return String.format("/routestore/%s", ServiceDiscoverable.getName(programId));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<RouteConfig> getAndWatchData(final ProgramId programId, final SettableFuture<RouteConfig> settableFuture, final SettableFuture<RouteConfig> settableFuture2, Watcher watcher) {
        Futures.addCallback(this.zkClient.getData(getZKPath(programId), watcher), new FutureCallback<NodeData>() { // from class: co.cask.cdap.route.store.ZKRouteStore.2
            public void onSuccess(NodeData nodeData) {
                try {
                    settableFuture.set((RouteConfig) ZKRouteStore.ROUTE_CONFIG_CODEC.decode(nodeData.getData()));
                    ZKRouteStore.this.routeConfigMap.replace(programId, settableFuture2, settableFuture);
                } catch (Exception e) {
                    ZKRouteStore.LOG.debug("Unable to deserialize the config for service {}. Got data {}", programId, nodeData.getData());
                    ZKRouteStore.this.routeConfigMap.remove(programId, settableFuture);
                    settableFuture.setException(e);
                }
            }

            public void onFailure(Throwable th) {
                if (th instanceof KeeperException.NoNodeException) {
                    settableFuture.set(new RouteConfig(Collections.emptyMap()));
                    ZKRouteStore.this.existsAndWatch(programId, settableFuture);
                } else {
                    settableFuture.setException(th);
                    ZKRouteStore.this.routeConfigMap.remove(programId, settableFuture);
                }
            }
        });
        return settableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void existsAndWatch(final ProgramId programId, final SettableFuture<RouteConfig> settableFuture) {
        Futures.addCallback(this.zkClient.exists(getZKPath(programId), new Watcher() { // from class: co.cask.cdap.route.store.ZKRouteStore.3
            public void process(WatchedEvent watchedEvent) {
                if (ZKRouteStore.this.routeConfigMap.containsKey(programId) && watchedEvent.getType() == Watcher.Event.EventType.NodeCreated) {
                    ZKRouteStore.this.getAndWatchData(programId, SettableFuture.create(), settableFuture, new ZKRouteWatcher(programId));
                }
            }
        }), new FutureCallback<Stat>() { // from class: co.cask.cdap.route.store.ZKRouteStore.4
            public void onSuccess(@Nullable Stat stat) {
                if (stat != null) {
                    ZKRouteStore.this.getAndWatchData(programId, SettableFuture.create(), settableFuture, new ZKRouteWatcher(programId));
                }
            }

            public void onFailure(Throwable th) {
                ZKRouteStore.this.routeConfigMap.remove(programId);
                ZKRouteStore.LOG.debug("Failed to check exists for property data for {}", programId, th);
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.routeConfigMap.clear();
    }
}
