/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
import org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarKafkaSinkTaskContext
implements SinkTaskContext {
    private static final Logger log = LoggerFactory.getLogger(PulsarKafkaSinkTaskContext.class);
    private final Map<String, String> config;
    private final SinkContext ctx;
    private final OffsetBackingStore offsetStore;
    private final String topicNamespace;
    private final Consumer<Collection<TopicPartition>> onPartitionChange;
    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
    private final ConcurrentHashMap<TopicPartition, Long> currentOffsets = new ConcurrentHashMap();

    public PulsarKafkaSinkTaskContext(Map<String, String> config, SinkContext ctx, Consumer<Collection<TopicPartition>> onPartitionChange) {
        this.config = config;
        this.ctx = ctx;
        this.offsetStore = new PulsarOffsetBackingStore(ctx.getPulsarClient());
        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(config);
        this.offsetStore.configure((WorkerConfig)pulsarKafkaWorkerConfig);
        this.offsetStore.start();
        this.onPartitionChange = onPartitionChange;
        this.topicNamespace = pulsarKafkaWorkerConfig.getString("topic.namespace");
    }

    public void close() {
        this.offsetStore.stop();
    }

    public Map<String, String> configs() {
        return this.config;
    }

    @VisibleForTesting
    protected Long currentOffset(String topic, int partition) {
        return this.currentOffset(new TopicPartition(topic, partition));
    }

    private Long currentOffset(TopicPartition topicPartition) {
        Long offset = this.currentOffsets.computeIfAbsent(topicPartition, kv -> {
            LinkedList req = Lists.newLinkedList();
            ByteBuffer key = this.topicPartitionAsKey(topicPartition);
            req.add(key);
            try {
                Optional<ByteBuffer> val;
                Map result = (Map)this.offsetStore.get((Collection)req).get();
                if (result != null && result.size() != 0 && (val = result.entrySet().stream().filter(entry -> ((ByteBuffer)entry.getKey()).equals(key)).findFirst().map(entry -> (ByteBuffer)entry.getValue())).isPresent()) {
                    long received = val.get().getLong();
                    if (log.isDebugEnabled()) {
                        log.debug("read initial offset for {} == {}", (Object)topicPartition, (Object)received);
                    }
                    return received;
                }
                return -1L;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("error getting initial state of {}", (Object)topicPartition, (Object)e);
                throw new RuntimeException("error getting initial state of " + topicPartition, e);
            }
            catch (ExecutionException e) {
                log.error("error getting initial state of {}", (Object)topicPartition, (Object)e);
                throw new RuntimeException("error getting initial state of " + topicPartition, e);
            }
        });
        return offset;
    }

    public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
        HashMap snapshot = Maps.newHashMapWithExpectedSize((int)this.currentOffsets.size());
        this.currentOffsets.forEach((topicPartition, offset) -> {
            if (offset > 0L) {
                snapshot.put(topicPartition, new OffsetAndMetadata(offset.longValue(), Optional.empty(), null));
            }
        });
        return snapshot;
    }

    private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
        return ByteBuffer.wrap((this.topicNamespace + "/" + topicPartition.toString()).getBytes(StandardCharsets.UTF_8));
    }

    private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> offsetMap, TopicPartition topicPartition, long l) {
        ByteBuffer key = this.topicPartitionAsKey(topicPartition);
        ByteBuffer value = ByteBuffer.allocate(8);
        value.putLong(l);
        value.flip();
        offsetMap.put(key, value);
    }

    private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) {
        try {
            this.ctx.seek(topicPartition.topic(), topicPartition.partition(), MessageIdUtils.getMessageId((long)offset));
        }
        catch (PulsarClientException e) {
            log.error("Failed to seek topic {} partition {} offset {}", new Object[]{topicPartition.topic(), topicPartition.partition(), offset, e});
            throw new RuntimeException("Failed to seek topic " + topicPartition.topic() + " partition " + topicPartition.partition() + " offset " + offset, e);
        }
        if (!this.currentOffsets.containsKey(topicPartition)) {
            this.runRepartition.set(true);
        }
        this.currentOffsets.put(topicPartition, offset);
    }

    public void updateLastOffset(TopicPartition topicPartition, long offset) {
        if (!this.currentOffsets.containsKey(topicPartition)) {
            this.runRepartition.set(true);
        }
        this.currentOffsets.put(topicPartition, offset);
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    public void offset(Map<TopicPartition, Long> map) {
        map.forEach((key, value) -> this.seekAndUpdateOffset((TopicPartition)key, (long)value));
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    public void offset(TopicPartition topicPartition, long l) {
        this.seekAndUpdateOffset(topicPartition, l);
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    public void timeout(long l) {
        log.warn("timeout() is called but is not supported currently.");
    }

    public Set<TopicPartition> assignment() {
        return this.currentOffsets.keySet();
    }

    public void pause(TopicPartition ... topicPartitions) {
        for (TopicPartition tp : topicPartitions) {
            try {
                this.ctx.pause(tp.topic(), tp.partition());
            }
            catch (PulsarClientException e) {
                log.error("Failed to pause topic {} partition {}", new Object[]{tp.topic(), tp.partition(), e});
                throw new RuntimeException("Failed to pause topic " + tp.topic() + " partition " + tp.partition(), e);
            }
        }
    }

    public void resume(TopicPartition ... topicPartitions) {
        for (TopicPartition tp : topicPartitions) {
            try {
                this.ctx.resume(tp.topic(), tp.partition());
            }
            catch (PulsarClientException e) {
                log.error("Failed to resume topic {} partition {}", new Object[]{tp.topic(), tp.partition(), e});
                throw new RuntimeException("Failed to resume topic " + tp.topic() + " partition " + tp.partition(), e);
            }
        }
    }

    public void requestCommit() {
        log.warn("requestCommit() is called but is not supported currently.");
    }

    public void flushOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws Exception {
        HashMap offsetMap = Maps.newHashMapWithExpectedSize((int)offsets.size());
        offsets.forEach((tp, om) -> this.fillOffsetMap(offsetMap, (TopicPartition)tp, om.offset()));
        CompletableFuture result = new CompletableFuture();
        this.offsetStore.set((Map)offsetMap, (ex, ignore) -> {
            if (ex == null) {
                result.complete(null);
            } else {
                log.error("error flushing offsets for {}", (Object)offsets, (Object)ex);
                result.completeExceptionally(ex);
            }
        });
        result.get();
    }
}

