/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.VerificationGuard;

public class InMemoryPartitionWriter
implements PartitionWriter {
    private final boolean autoCommit;
    private final Map<TopicPartition, PartitionState> partitions;

    public InMemoryPartitionWriter(boolean autoCommit) {
        this.autoCommit = autoCommit;
        this.partitions = new ConcurrentHashMap<TopicPartition, PartitionState>();
    }

    private PartitionState partitionState(TopicPartition tp) {
        return this.partitions.computeIfAbsent(tp, __ -> new PartitionState());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerListener(TopicPartition tp, PartitionWriter.Listener listener) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            state.listeners.add(listener);
        }
        finally {
            state.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deregisterListener(TopicPartition tp, PartitionWriter.Listener listener) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            state.listeners.remove(listener);
        }
        finally {
            state.lock.unlock();
        }
    }

    public LogConfig config(TopicPartition tp) {
        return new LogConfig(Collections.emptyMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long append(TopicPartition tp, VerificationGuard verificationGuard, MemoryRecords batch) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            ByteBuffer buffer = ByteBuffer.allocate(batch.sizeInBytes());
            batch.firstBatch().writeTo(buffer);
            buffer.flip();
            state.entries.add(MemoryRecords.readableRecords((ByteBuffer)buffer));
            state.endOffset += StreamSupport.stream(batch.records().spliterator(), false).count();
            if (this.autoCommit) {
                this.commit(tp, state.endOffset);
            }
            long l = state.endOffset;
            return l;
        }
        finally {
            state.lock.unlock();
        }
    }

    public CompletableFuture<Void> deleteRecords(TopicPartition tp, long deleteBeforeOffset) throws KafkaException {
        throw new RuntimeException("method not implemented");
    }

    public CompletableFuture<VerificationGuard> maybeStartTransactionVerification(TopicPartition tp, String transactionalId, long producerId, short producerEpoch, short apiVersion) throws KafkaException {
        return CompletableFuture.completedFuture(new VerificationGuard());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commit(TopicPartition tp, long offset) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            state.committedOffset = offset;
            state.listeners.forEach(listener -> listener.onHighWatermarkUpdated(tp, state.committedOffset));
        }
        finally {
            state.lock.unlock();
        }
    }

    public void commit(TopicPartition tp) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            state.committedOffset = state.endOffset;
            state.listeners.forEach(listener -> listener.onHighWatermarkUpdated(tp, state.committedOffset));
        }
        finally {
            state.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<MemoryRecords> entries(TopicPartition tp) {
        PartitionState state = this.partitionState(tp);
        state.lock.lock();
        try {
            List<MemoryRecords> list = Collections.unmodifiableList(state.entries);
            return list;
        }
        finally {
            state.lock.unlock();
        }
    }

    private static class PartitionState {
        private final ReentrantLock lock = new ReentrantLock();
        private final List<PartitionWriter.Listener> listeners = new ArrayList<PartitionWriter.Listener>();
        private final List<MemoryRecords> entries = new ArrayList<MemoryRecords>();
        private long endOffset = 0L;
        private long committedOffset = 0L;

        private PartitionState() {
        }
    }
}

