package org.apache.helix.manager.zk;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/manager/zk/ZkBucketDataAccessor.class */
public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    private static final int DEFAULT_BUCKET_SIZE = 51200;
    private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
    private static final String DATA_SIZE_KEY = "DATA_SIZE";
    private static final String METADATA_KEY = "METADATA";
    private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
    private static final String LAST_WRITE_KEY = "LAST_WRITE";
    private final int _bucketSize;
    private final long _versionTTLms;
    private final ZkSerializer _zkSerializer;
    private final RealmAwareZkClient _zkClient;
    private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
    private final Map<String, ScheduledFuture> _gcTaskFutureMap;
    private boolean _usesExternalZkClient;
    private static final Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
    private static final long DEFAULT_VERSION_TTL = TimeUnit.MINUTES.toMillis(1);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final ScheduledExecutorService GC_THREAD = Executors.newSingleThreadScheduledExecutor(runnable -> {
        Thread thread = new Thread(runnable, "ZkBucketDataAccessorGcThread");
        thread.setDaemon(true);
        return thread;
    });

    public ZkBucketDataAccessor(String str, int i, long j) {
        this(createRealmAwareZkClient(str), i, j, false);
    }

    public ZkBucketDataAccessor(RealmAwareZkClient realmAwareZkClient) {
        this(realmAwareZkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
    }

    public ZkBucketDataAccessor(RealmAwareZkClient realmAwareZkClient, int i, long j) {
        this(realmAwareZkClient, i, j, true);
    }

    private ZkBucketDataAccessor(RealmAwareZkClient realmAwareZkClient, int i, long j, boolean z) {
        this._gcTaskFutureMap = new HashMap();
        this._usesExternalZkClient = false;
        this._zkClient = realmAwareZkClient;
        this._zkBaseDataAccessor = new ZkBaseDataAccessor<>(this._zkClient);
        this._zkSerializer = new org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer();
        this._bucketSize = i;
        this._versionTTLms = j;
        this._usesExternalZkClient = z;
    }

    public ZkBucketDataAccessor(String str) {
        this(str, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
    }

    private static RealmAwareZkClient createRealmAwareZkClient(String str) {
        HelixZkClient federatedZkClient;
        if (Boolean.getBoolean("helix.multiZkEnabled") || str == null) {
            LOG.warn("ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - starting ZkBucketDataAccessor in multi-zk mode!");
            try {
                federatedZkClient = new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), new RealmAwareZkClient.RealmAwareZkClientConfig());
            } catch (IllegalArgumentException | InvalidRoutingDataException e) {
                throw new HelixException("Not able to connect on realm-aware mode", e);
            }
        } else {
            federatedZkClient = DedicatedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(str));
        }
        federatedZkClient.setZkSerializer(new org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer());
        return federatedZkClient;
    }

    @Override // org.apache.helix.BucketDataAccessor
    public <T extends HelixProperty> boolean compressedBucketWrite(String str, T t) throws IOException {
        ZkBaseDataAccessor<byte[]>.AccessResult doUpdate = this._zkBaseDataAccessor.doUpdate(str + "/" + LAST_WRITE_KEY, bArr -> {
            return (bArr == null || bArr.length == 0) ? "0".getBytes() : String.valueOf(Long.parseLong(new String(bArr)) + 1).getBytes();
        }, AccessOption.PERSISTENT);
        if (doUpdate._retCode != ZkBaseDataAccessor.RetCode.OK) {
            throw new HelixException(String.format("Failed to write the write version at path: %s!", str));
        }
        String str2 = new String(doUpdate._updatedValue);
        long parseLong = Long.parseLong(str2);
        String str3 = str + "/" + str2;
        byte[] compress = GZipCompressionUtil.compress(this._zkSerializer.serialize(t.getRecord()));
        int length = ((compress.length + this._bucketSize) - 1) / this._bucketSize;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        for (int i2 = 0; i2 < length; i2++) {
            arrayList.add(str3 + "/" + i2);
            if (i2 == length - 1) {
                arrayList2.add(Arrays.copyOfRange(compress, i, i + (compress.length % this._bucketSize)));
            } else {
                arrayList2.add(Arrays.copyOfRange(compress, i, i + this._bucketSize));
            }
            i += this._bucketSize;
        }
        byte[] writeValueAsBytes = OBJECT_MAPPER.writeValueAsBytes(ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(this._bucketSize), DATA_SIZE_KEY, Integer.toString(compress.length)));
        arrayList.add(str3 + "/" + METADATA_KEY);
        arrayList2.add(writeValueAsBytes);
        for (boolean z : this._zkBaseDataAccessor.setChildren(arrayList, arrayList2, AccessOption.PERSISTENT)) {
            if (!z) {
                throw new HelixException(String.format("Failed to write the data buckets for path: %s", str));
            }
        }
        if (!this._zkBaseDataAccessor.update(str + "/" + LAST_SUCCESSFUL_WRITE_KEY, bArr2 -> {
            if (bArr2 == null || bArr2.length == 0) {
                return str2.getBytes();
            }
            if (Long.parseLong(new String(bArr2)) < parseLong) {
                return str2.getBytes();
            }
            return null;
        }, AccessOption.PERSISTENT)) {
            throw new HelixException(String.format("Failed to write the last successful write metadata at path: %s!", str));
        }
        updateGCTimer(str, parseLong);
        return true;
    }

    @Override // org.apache.helix.BucketDataAccessor
    public <T extends HelixProperty> HelixProperty compressedBucketRead(String str, Class<T> cls) {
        return cls.cast(compressedBucketRead(str));
    }

    @Override // org.apache.helix.BucketDataAccessor
    public void compressedBucketDelete(String str) {
        if (!this._zkBaseDataAccessor.remove(str, AccessOption.PERSISTENT)) {
            throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", str));
        }
        synchronized (this) {
            this._gcTaskFutureMap.remove(str);
        }
    }

    @Override // org.apache.helix.BucketDataAccessor
    public void disconnect() {
        if (this._usesExternalZkClient || this._zkClient == null || this._zkClient.isClosed()) {
            return;
        }
        this._zkClient.close();
    }

    private HelixProperty compressedBucketRead(String str) {
        byte[] bArr = this._zkBaseDataAccessor.get(str + "/" + LAST_SUCCESSFUL_WRITE_KEY, (Stat) null, AccessOption.PERSISTENT);
        if (bArr == null) {
            throw new ZkNoNodeException(String.format("Last successful write ZNode does not exist for path: %s", str));
        }
        String str2 = new String(bArr);
        byte[] bArr2 = this._zkBaseDataAccessor.get(str + "/" + str2 + "/" + METADATA_KEY, (Stat) null, AccessOption.PERSISTENT);
        if (bArr2 == null) {
            throw new ZkNoNodeException(String.format("Metadata ZNode does not exist for path: %s", str));
        }
        try {
            Map map = (Map) OBJECT_MAPPER.readValue(bArr2, Map.class);
            Object obj = map.get(BUCKET_SIZE_KEY);
            Object obj2 = map.get(DATA_SIZE_KEY);
            if (obj == null) {
                throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, str));
            }
            if (obj2 == null) {
                throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, str));
            }
            int parseInt = Integer.parseInt((String) obj);
            int parseInt2 = Integer.parseInt((String) obj2);
            int i = ((parseInt2 + this._bucketSize) - 1) / this._bucketSize;
            byte[] bArr3 = new byte[parseInt2];
            String str3 = str + "/" + str2;
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(str3 + "/" + i2);
            }
            List<byte[]> list = this._zkBaseDataAccessor.get((List<String>) arrayList, (List<Stat>) null, AccessOption.PERSISTENT, true);
            int i3 = 0;
            for (int i4 = 0; i4 < i; i4++) {
                if (i4 == i - 1) {
                    System.arraycopy(list.get(i4), 0, bArr3, i3, parseInt2 % parseInt);
                } else {
                    System.arraycopy(list.get(i4), 0, bArr3, i3, parseInt);
                    i3 += parseInt;
                }
            }
            try {
                return new HelixProperty((ZNRecord) this._zkSerializer.deserialize(GZipCompressionUtil.uncompress(new ByteArrayInputStream(bArr3))));
            } catch (IOException e) {
                throw new HelixException(String.format("Failed to decompress path: %s!", str), e);
            }
        } catch (IOException e2) {
            throw new HelixException(String.format("Failed to deserialize path metadata: %s!", str), e2);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        disconnect();
    }

    public void finalize() {
        this._zkBaseDataAccessor.close();
        close();
    }

    private synchronized void updateGCTimer(String str, long j) {
        if (this._gcTaskFutureMap.containsKey(str)) {
            this._gcTaskFutureMap.remove(str).cancel(false);
        }
        this._gcTaskFutureMap.put(str, GC_THREAD.schedule(() -> {
            try {
                deleteStaleVersions(str, j);
            } catch (Exception e) {
                LOG.error("Failed to delete the stale versions.", e);
            }
        }, this._versionTTLms, TimeUnit.MILLISECONDS));
    }

    private void deleteStaleVersions(String str, long j) {
        List<String> childNames = this._zkBaseDataAccessor.getChildNames(str, AccessOption.PERSISTENT);
        if (childNames == null || childNames.isEmpty()) {
            return;
        }
        Iterator<String> it = getPathsToDelete(str, filterChildrenNames(childNames, j)).iterator();
        while (it.hasNext()) {
            this._zkBaseDataAccessor.remove(it.next(), AccessOption.PERSISTENT);
        }
    }

    private List<String> filterChildrenNames(List<String> list, long j) {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            if (!str.equals(LAST_SUCCESSFUL_WRITE_KEY) && !str.equals(LAST_WRITE_KEY)) {
                try {
                    if (Long.parseLong(str) < j) {
                        arrayList.add(str);
                    }
                } catch (NumberFormatException e) {
                    LOG.warn("Found an invalid ZNode: {}", str);
                }
            }
        }
        return arrayList;
    }

    private List<String> getPathsToDelete(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        list.forEach(str2 -> {
            arrayList.add(str + "/" + str2);
        });
        return arrayList;
    }
}
