/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.NewTopic;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.ByteArraySerializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.WorkerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.Callback;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.ConnectUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.ConvertingFutureCallback;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.KafkaBasedLog;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.TopicAdmin;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetBackingStore
implements OffsetBackingStore {
    private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
    private KafkaBasedLog<byte[], byte[]> offsetLog;
    private HashMap<ByteBuffer, ByteBuffer> data;
    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>(){

        @Override
        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
            KafkaOffsetBackingStore.this.data.put(key, value);
        }
    };

    @Override
    public void configure(WorkerConfig config) {
        String topic = config.getString("offset.storage.topic");
        if (topic == null || topic.trim().length() == 0) {
            throw new ConfigException("Offset storage topic must be specified");
        }
        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
        this.data = new HashMap();
        Map<String, Object> originals = config.originals();
        HashMap<String, Object> producerProps = new HashMap<String, Object>(originals);
        producerProps.put("key.serializer", ByteArraySerializer.class.getName());
        producerProps.put("value.serializer", ByteArraySerializer.class.getName());
        producerProps.put("delivery.timeout.ms", Integer.MAX_VALUE);
        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
        HashMap<String, Object> consumerProps = new HashMap<String, Object>(originals);
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
        HashMap<String, Object> adminProps = new HashMap<String, Object>(originals);
        ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
        Map<String, Object> topicSettings = config instanceof DistributedConfig ? ((DistributedConfig)config).offsetStorageTopicSettings() : Collections.emptyMap();
        NewTopic topicDescription = TopicAdmin.defineTopic(topic).config(topicSettings).compacted().partitions(config.getInt("offset.storage.partitions")).replicationFactor(config.getShort("offset.storage.replication.factor")).build();
        this.offsetLog = this.createKafkaBasedLog(topic, producerProps, consumerProps, this.consumedCallback, topicDescription, adminProps);
    }

    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(final String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback, final NewTopic topicDescription, final Map<String, Object> adminProps) {
        Runnable createTopics = new Runnable(){

            @Override
            public void run() {
                log.debug("Creating admin client to manage Connect internal offset topic");
                try (TopicAdmin admin = new TopicAdmin(adminProps);){
                    Set<String> newTopics = admin.createTopics(topicDescription);
                    if (!newTopics.contains(topic)) {
                        log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", (Object)topic, (Object)"compact");
                        admin.verifyTopicCleanupPolicyOnlyCompact(topic, "offset.storage.topic", "source connector offsets");
                    }
                }
            }
        };
        return new KafkaBasedLog<byte[], byte[]>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
    }

    @Override
    public void start() {
        log.info("Starting KafkaOffsetBackingStore");
        this.offsetLog.start();
        log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
    }

    @Override
    public void stop() {
        log.info("Stopping KafkaOffsetBackingStore");
        this.offsetLog.stop();
        log.info("Stopped KafkaOffsetBackingStore");
    }

    @Override
    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(){

            @Override
            public Map<ByteBuffer, ByteBuffer> convert(Void result) {
                HashMap<ByteBuffer, ByteBuffer> values = new HashMap<ByteBuffer, ByteBuffer>();
                for (ByteBuffer key : keys) {
                    values.put(key, (ByteBuffer)KafkaOffsetBackingStore.this.data.get(key));
                }
                return values;
            }
        };
        this.offsetLog.readToEnd((Callback<Void>)future);
        return future;
    }

    @Override
    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
        SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
            ByteBuffer key = entry.getKey();
            ByteBuffer value = entry.getValue();
            this.offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(), producerCallback);
        }
        return producerCallback;
    }

    private static class SetCallbackFuture
    implements com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback,
    Future<Void> {
        private int numLeft;
        private boolean completed = false;
        private Throwable exception = null;
        private final Callback<Void> callback;

        public SetCallbackFuture(int numRecords, Callback<Void> callback) {
            this.numLeft = numRecords;
            this.callback = callback;
        }

        @Override
        public synchronized void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                if (!this.completed) {
                    this.exception = exception;
                    this.callback.onCompletion(exception, null);
                    this.completed = true;
                    this.notify();
                }
                return;
            }
            --this.numLeft;
            if (this.numLeft == 0) {
                this.callback.onCompletion(null, null);
                this.completed = true;
                this.notify();
            }
        }

        @Override
        public synchronized boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public synchronized boolean isCancelled() {
            return false;
        }

        @Override
        public synchronized boolean isDone() {
            return this.completed;
        }

        @Override
        public synchronized Void get() throws InterruptedException, ExecutionException {
            while (!this.completed) {
                this.wait();
            }
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return null;
        }

        @Override
        public synchronized Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            long started = System.currentTimeMillis();
            long limit = started + unit.toMillis(timeout);
            while (!this.completed) {
                long leftMs = limit - System.currentTimeMillis();
                if (leftMs < 0L) {
                    throw new TimeoutException("KafkaOffsetBackingStore Future timed out.");
                }
                this.wait(leftMs);
            }
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return null;
        }
    }
}

