package com.couchbase.kafka.state;

import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
import com.couchbase.client.core.dcp.BucketStreamState;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.kafka.CouchbaseKafkaEnvironment;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Iterator;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/kafka/state/ZookeeperStateSerializer.class */
public class ZookeeperStateSerializer implements StateSerializer {
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperStateSerializer.class);
    private final ZkClient zkClient;
    private final String bucket;
    private final long stateSerializationThreshold;
    private long updatedAt = 0;
    private volatile BucketStreamAggregatorState state = new BucketStreamAggregatorState();

    public ZookeeperStateSerializer(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment) {
        this.zkClient = new ZkClient(couchbaseKafkaEnvironment.kafkaZookeeperAddress(), 4000, 6000, ZKStringSerializer$.MODULE$);
        this.bucket = couchbaseKafkaEnvironment.couchbaseBucket();
        this.stateSerializationThreshold = couchbaseKafkaEnvironment.couchbaseStateSerializationThreshold();
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public void dump(BucketStreamAggregatorState bucketStreamAggregatorState) {
        Iterator it = bucketStreamAggregatorState.iterator();
        while (it.hasNext()) {
            dump(bucketStreamAggregatorState, ((BucketStreamState) it.next()).partition());
        }
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public void dump(BucketStreamAggregatorState bucketStreamAggregatorState, short s) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.updatedAt > this.stateSerializationThreshold) {
            BucketStreamState bucketStreamState = bucketStreamAggregatorState.get(s);
            ObjectNode createObjectNode = MAPPER.createObjectNode();
            createObjectNode.put("vbucketUUID", bucketStreamState.vbucketUUID());
            createObjectNode.put("startSequenceNumber", bucketStreamState.startSequenceNumber());
            createObjectNode.put("endSequenceNumber", bucketStreamState.endSequenceNumber());
            createObjectNode.put("snapshotStartSequenceNumber", bucketStreamState.snapshotStartSequenceNumber());
            createObjectNode.put("snapshotEndSequenceNumber", bucketStreamState.snapshotEndSequenceNumber());
            this.zkClient.createPersistent(pathForState(s), true);
            this.zkClient.writeData(pathForState(s), createObjectNode.toString());
            this.updatedAt = currentTimeMillis;
        }
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public BucketStreamAggregatorState load(BucketStreamAggregatorState bucketStreamAggregatorState) {
        Iterator it = bucketStreamAggregatorState.iterator();
        while (it.hasNext()) {
            BucketStreamState load = load(bucketStreamAggregatorState, ((BucketStreamState) it.next()).partition());
            if (load != null) {
                bucketStreamAggregatorState.put(load, false);
            }
        }
        return bucketStreamAggregatorState;
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public BucketStreamState load(BucketStreamAggregatorState bucketStreamAggregatorState, short s) {
        String str = (String) this.zkClient.readData(pathForState(s), true);
        if (str == null) {
            return null;
        }
        try {
            JsonNode readTree = MAPPER.readTree(str);
            return new BucketStreamState(s, readTree.get("vbucketUUID").asLong(0L), readTree.get("startSequenceNumber").asLong(0L), readTree.get("endSequenceNumber").asLong(0L), readTree.get("snapshotStartSequenceNumber").asLong(0L), readTree.get("snapshotEndSequenceNumber").asLong(0L));
        } catch (IOException e) {
            LOGGER.warn("Error while decoding state", e);
            return null;
        }
    }

    private String pathForState(int i) {
        return Paths.get("/couchbase-kafka-connector", this.bucket, Integer.toString(i)).toString();
    }
}
