package co.cask.cdap.data.tools;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.security.impersonation.Impersonator;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Inject;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/tools/StreamStateStoreUpgrader.class */
public class StreamStateStoreUpgrader extends AbstractQueueUpgrader {
    private static final Logger LOG = LoggerFactory.getLogger(StreamStateStoreUpgrader.class);

    @Inject
    public StreamStateStoreUpgrader(LocationFactory locationFactory, HBaseTableUtil hBaseTableUtil, NamespacedLocationFactory namespacedLocationFactory, Configuration configuration, NamespaceQueryAdmin namespaceQueryAdmin, Impersonator impersonator) {
        super(locationFactory, namespacedLocationFactory, hBaseTableUtil, configuration, namespaceQueryAdmin, impersonator);
    }

    @Override // co.cask.cdap.data.tools.AbstractQueueUpgrader
    protected Multimap<NamespaceId, TableId> getTableIds() throws Exception {
        HashMultimap create = HashMultimap.create();
        for (NamespaceMeta namespaceMeta : this.namespaceQueryAdmin.list()) {
            TableId stateStoreTableId = StreamUtils.getStateStoreTableId(namespaceMeta.getNamespaceId());
            create.put(namespaceMeta.getNamespaceId(), this.tableUtil.createHTableId(new NamespaceId(stateStoreTableId.getNamespace()), stateStoreTableId.getTableName()));
        }
        return create;
    }

    @Override // co.cask.cdap.data.tools.AbstractQueueUpgrader
    @Nullable
    protected byte[] processRowKey(byte[] bArr) {
        LOG.debug("Processing stream state for: {}", Bytes.toString(bArr));
        Id.Stream fromRowKey = fromRowKey(bArr);
        LOG.debug("Upgrading stream state for: {}", fromRowKey);
        if (fromRowKey == null) {
            return null;
        }
        return fromRowKey.toBytes();
    }

    @Nullable
    private Id.Stream fromRowKey(byte[] bArr) {
        String bytes = Bytes.toString(bArr);
        String[] split = bytes.split("/");
        if (split.length != 2) {
            LOG.warn("Unknown format for row key: {}. Expected namespace:foo/stream:bar.", bytes);
            return null;
        }
        if (split[0].startsWith("namespace:") && split[1].startsWith("stream:")) {
            return Id.Stream.from(split[0].substring("namespace:".length()), split[1].substring("stream:".length()));
        }
        LOG.warn("Unknown format for row key: {}. Expected namespace:foo/stream:bar.", bytes);
        return null;
    }
}
