package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import redis.clients.jedis.Jedis;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.class */
public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final RedisParameters redisParameters;
    private final SingleSplitReaderContext context;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private Jedis jedis;

    public RedisSourceReader(RedisParameters redisParameters, SingleSplitReaderContext singleSplitReaderContext, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.redisParameters = redisParameters;
        this.context = singleSplitReaderContext;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() throws Exception {
        this.jedis = this.redisParameters.buildJedis();
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.jedis)) {
            this.jedis.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        Set<String> keys = this.jedis.keys(this.redisParameters.getKeysPattern());
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        Iterator<String> it = keys.iterator();
        while (it.hasNext()) {
            for (String str : redisDataType.get(this.jedis, it.next())) {
                if (this.deserializationSchema == null) {
                    collector.collect(new SeaTunnelRow(new Object[]{str}));
                } else if (this.redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV && redisDataType == RedisDataType.HASH) {
                    for (Map.Entry entry : JsonUtils.toMap(str).entrySet()) {
                        String str2 = (String) entry.getKey();
                        Map map = JsonUtils.toMap((String) entry.getValue());
                        map.put(this.deserializationSchema.getProducedType().getFieldName(0), str2);
                        this.deserializationSchema.deserialize(JsonUtils.toJsonString(map).getBytes(), collector);
                    }
                } else {
                    this.deserializationSchema.deserialize(str.getBytes(), collector);
                }
            }
        }
        this.context.signalNoMoreElement();
    }
}
