package org.apache.nifi.redis.service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;

@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to provide high-availability configurations.")
@Tags({"redis", "distributed", "cache", "map"})
/* loaded from: input_file:org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.class */
public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder().name("redis-connection-pool").displayName("Redis Connection Pool").identifiesControllerService(RedisConnectionPool.class).required(true).build();
    public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder().name("redis-cache-ttl").displayName("TTL").description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).required(true).defaultValue("0 secs").build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    private volatile RedisConnectionPool redisConnectionPool;
    private Long ttl;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        RedisType redisType;
        ArrayList arrayList = new ArrayList();
        RedisConnectionPool asControllerService = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        if (asControllerService != null && (redisType = asControllerService.getRedisType()) != null && redisType == RedisType.CLUSTER) {
            arrayList.add(new ValidationResult.Builder().subject(REDIS_CONNECTION_POOL.getDisplayName()).valid(false).explanation(REDIS_CONNECTION_POOL.getDisplayName() + " is configured in clustered mode, and this service requires a non-clustered Redis").build());
        }
        return arrayList;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) {
        this.redisConnectionPool = configurationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
        this.ttl = configurationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
        if (this.ttl.longValue() == 0) {
            this.ttl = -1L;
        }
    }

    @OnDisabled
    public void onDisabled() {
        this.redisConnectionPool = null;
    }

    public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        return ((Boolean) withConnection(redisConnection -> {
            Tuple<byte[], byte[]> serialize = serialize(k, v, serializer, serializer2);
            boolean booleanValue = redisConnection.setNX((byte[]) serialize.getKey(), (byte[]) serialize.getValue()).booleanValue();
            if (this.ttl.longValue() != -1 && booleanValue) {
                redisConnection.expire((byte[]) serialize.getKey(), this.ttl.longValue());
            }
            return Boolean.valueOf(booleanValue);
        })).booleanValue();
    }

    public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<V> deserializer) throws IOException {
        return (V) withConnection(redisConnection -> {
            Tuple<byte[], byte[]> serialize = serialize(k, v, serializer, serializer2);
            do {
                redisConnection.watch((byte[][]) new byte[]{(byte[]) serialize.getKey()});
                byte[] bArr = redisConnection.get((byte[]) serialize.getKey());
                redisConnection.multi();
                redisConnection.setNX((byte[]) serialize.getKey(), (byte[]) serialize.getValue());
                if (this.ttl.longValue() != -1 && bArr == null) {
                    redisConnection.expire((byte[]) serialize.getKey(), this.ttl.longValue());
                }
                List exec = redisConnection.exec();
                if (exec.size() > 0) {
                    Object obj = exec.get(0);
                    if (!(obj instanceof Boolean)) {
                        throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + obj.getClass().getName() + " with value " + obj.toString());
                    }
                    if (((Boolean) obj).booleanValue()) {
                        return null;
                    }
                    return deserializer.deserialize(bArr);
                }
            } while (isEnabled());
            return null;
        });
    }

    public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
        return ((Boolean) withConnection(redisConnection -> {
            return redisConnection.exists(serialize(k, serializer));
        })).booleanValue();
    }

    public <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        withConnection(redisConnection -> {
            Tuple<byte[], byte[]> serialize = serialize(k, v, serializer, serializer2);
            redisConnection.set((byte[]) serialize.getKey(), (byte[]) serialize.getValue(), Expiration.seconds(this.ttl.longValue()), (RedisStringCommands.SetOption) null);
            return null;
        });
    }

    public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
        return (V) withConnection(redisConnection -> {
            return deserializer.deserialize(redisConnection.get(serialize(k, serializer)));
        });
    }

    public void close() throws IOException {
    }

    public <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
        return ((Boolean) withConnection(redisConnection -> {
            return Boolean.valueOf(redisConnection.del((byte[][]) new byte[]{serialize(k, serializer)}).longValue() > 0);
        })).booleanValue();
    }

    public long removeByPattern(String str) throws IOException {
        return ((Long) withConnection(redisConnection -> {
            long j = 0;
            ArrayList arrayList = new ArrayList();
            Cursor scan = redisConnection.scan(ScanOptions.scanOptions().count(100L).match(str).build());
            while (scan.hasNext()) {
                arrayList.add(scan.next());
                if (arrayList.size() == 1000) {
                    j += redisConnection.del(getKeys(arrayList)).longValue();
                    arrayList.clear();
                }
            }
            if (arrayList.size() > 0) {
                j += redisConnection.del(getKeys(arrayList)).longValue();
                arrayList.clear();
            }
            return Long.valueOf(j);
        })).longValue();
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [byte[], byte[][]] */
    private byte[][] getKeys(List<byte[]> list) {
        ?? r0 = new byte[list.size()];
        for (int i = 0; i < list.size(); i++) {
            r0[i] = list.get(i);
        }
        return r0;
    }

    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
        return (AtomicCacheEntry) withConnection(redisConnection -> {
            byte[] bArr = redisConnection.get(serialize(k, serializer));
            if (bArr == null) {
                return null;
            }
            return new AtomicCacheEntry(k, deserializer.deserialize(bArr), bArr);
        });
    }

    public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> atomicCacheEntry, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        return ((Boolean) withConnection(redisConnection -> {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            serializer.serialize(atomicCacheEntry.getKey(), byteArrayOutputStream);
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.reset();
            serializer2.serialize(atomicCacheEntry.getValue(), byteArrayOutputStream);
            byte[] byteArray2 = byteArrayOutputStream.toByteArray();
            byte[] bArr = (byte[]) atomicCacheEntry.getRevision().orElse(null);
            boolean z = false;
            redisConnection.watch((byte[][]) new byte[]{byteArray});
            byte[] bArr2 = redisConnection.get(byteArray);
            redisConnection.multi();
            if (Arrays.equals(bArr, bArr2)) {
                redisConnection.getSet(byteArray, byteArray2);
            }
            if (redisConnection.exec().size() > 0) {
                z = true;
            }
            return Boolean.valueOf(z);
        })).booleanValue();
    }

    private <K, V> Tuple<byte[], byte[]> serialize(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        serializer.serialize(k, byteArrayOutputStream);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        byteArrayOutputStream.reset();
        serializer2.serialize(v, byteArrayOutputStream);
        return new Tuple<>(byteArray, byteArrayOutputStream.toByteArray());
    }

    private <K> byte[] serialize(K k, Serializer<K> serializer) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        serializer.serialize(k, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    private <T> T withConnection(RedisAction<T> redisAction) throws IOException {
        RedisConnection redisConnection = null;
        try {
            redisConnection = this.redisConnectionPool.getConnection();
            T execute = redisAction.execute(redisConnection);
            if (redisConnection != null) {
                try {
                    redisConnection.close();
                } catch (Exception e) {
                    getLogger().warn("Error closing connection: " + e.getMessage(), e);
                }
            }
            return execute;
        } catch (Throwable th) {
            if (redisConnection != null) {
                try {
                    redisConnection.close();
                } catch (Exception e2) {
                    getLogger().warn("Error closing connection: " + e2.getMessage(), e2);
                }
            }
            throw th;
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(REDIS_CONNECTION_POOL);
        arrayList.add(TTL);
        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(arrayList);
    }
}
