package com.couchbase.kafka.state;

import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.kafka.CouchbaseKafkaEnvironment;
import java.io.IOException;
import java.util.Iterator;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/kafka/state/ZookeeperStateSerializer.class */
public class ZookeeperStateSerializer implements StateSerializer {
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperStateSerializer.class);
    private final ZkClient zkClient;
    private final String bucket;
    private final long stateSerializationThreshold;
    private long updatedAt = 0;

    public ZookeeperStateSerializer(CouchbaseKafkaEnvironment couchbaseKafkaEnvironment) {
        this.zkClient = new ZkClient(couchbaseKafkaEnvironment.kafkaZookeeperAddress(), 4000, 6000, ZKStringSerializer$.MODULE$);
        this.bucket = couchbaseKafkaEnvironment.couchbaseBucket();
        this.stateSerializationThreshold = couchbaseKafkaEnvironment.couchbaseStateSerializationThreshold();
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public void dump(ConnectorState connectorState) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.updatedAt > this.stateSerializationThreshold) {
            Iterator<StreamState> it = connectorState.iterator();
            while (it.hasNext()) {
                writeState(it.next());
            }
            this.updatedAt = currentTimeMillis;
        }
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public void dump(ConnectorState connectorState, short s) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.updatedAt > this.stateSerializationThreshold) {
            writeState(connectorState.get(s));
            this.updatedAt = currentTimeMillis;
        }
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public ConnectorState load(ConnectorState connectorState) {
        Iterator<StreamState> it = connectorState.iterator();
        while (it.hasNext()) {
            StreamState load = load(connectorState, it.next().partition());
            if (load != null) {
                connectorState.put(load);
            }
        }
        return connectorState;
    }

    @Override // com.couchbase.kafka.state.StateSerializer
    public StreamState load(ConnectorState connectorState, short s) {
        String str = (String) this.zkClient.readData(pathForState(s), true);
        if (str == null) {
            return null;
        }
        try {
            JsonNode readTree = MAPPER.readTree(str);
            return new StreamState(s, readTree.get("vbucketUUID").asLong(0L), readTree.get("sequenceNumber").asLong(0L));
        } catch (IOException e) {
            LOGGER.warn("Error while decoding state", e);
            return null;
        }
    }

    private String pathForState(short s) {
        return String.format("/couchbase-kafka-connector2/%s/%d", this.bucket, Short.valueOf(s));
    }

    private void writeState(StreamState streamState) {
        ObjectNode createObjectNode = MAPPER.createObjectNode();
        createObjectNode.put("vbucketUUID", streamState.vbucketUUID());
        createObjectNode.put("sequenceNumber", streamState.sequenceNumber());
        this.zkClient.createPersistent(pathForState(streamState.partition()), true);
        this.zkClient.writeData(pathForState(streamState.partition()), createObjectNode.toString());
    }
}
