/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.v3_0_8.client.impl;

import java.time.Clock;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.v3_0_8.client.impl.MessageRouterBase;
import org.apache.pulsar.v3_0_8.client.util.MathUtils;

public class RoundRobinPartitionMessageRouterImpl
extends MessageRouterBase {
    private static final long serialVersionUID = 1L;
    private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex");
    private volatile int partitionIndex = 0;
    private final int startPtnIdx;
    private final boolean isBatchingEnabled;
    private final long partitionSwitchMs;
    private final Clock clock;
    private static final Clock SYSTEM_CLOCK = Clock.systemUTC();

    public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, int startPtnIdx, boolean isBatchingEnabled, long partitionSwitchMs) {
        this(hashingScheme, startPtnIdx, isBatchingEnabled, partitionSwitchMs, SYSTEM_CLOCK);
    }

    public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme, int startPtnIdx, boolean isBatchingEnabled, long partitionSwitchMs, Clock clock) {
        super(hashingScheme);
        PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
        this.startPtnIdx = startPtnIdx;
        this.isBatchingEnabled = isBatchingEnabled;
        this.partitionSwitchMs = Math.max(1L, partitionSwitchMs);
        this.clock = clock;
    }

    public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
        if (msg.hasKey()) {
            return MathUtils.signSafeMod(this.hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
        }
        if (this.isBatchingEnabled) {
            long currentMs = this.clock.millis();
            return MathUtils.signSafeMod(currentMs / this.partitionSwitchMs + (long)this.startPtnIdx, topicMetadata.numPartitions());
        }
        return MathUtils.signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
    }
}

