package org.apache.shardingsphere.scaling.core.common.channel.distribution;

import java.util.BitSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.apache.shardingsphere.scaling.core.common.channel.AckCallback;
import org.apache.shardingsphere.scaling.core.common.channel.Channel;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.class */
public final class DistributionChannel implements Channel {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DistributionChannel.class);
    private final int channelNumber;
    private final BitSetChannel[] channels;
    private final AckCallback ackCallback;
    private long lastAckIndex;
    private ScheduledExecutorService scheduleAckRecordsExecutor;
    private final BitSetChannel autoAckChannel = new AutoAcknowledgeChannel();
    private final Map<String, Integer> channelAssignment = new HashMap();
    private final AtomicLong indexAutoIncreaseGenerator = new AtomicLong();
    private final Queue<Integer> toBeAckBitSetIndexes = new ConcurrentLinkedQueue();

    public DistributionChannel(int i, AckCallback ackCallback) {
        this.channelNumber = i;
        this.ackCallback = ackCallback;
        this.channels = new BitSetChannel[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.channels[i2] = new BlockingQueueChannel();
        }
        scheduleAckRecords();
    }

    private void scheduleAckRecords() {
        this.scheduleAckRecordsExecutor = Executors.newSingleThreadScheduledExecutor();
        this.scheduleAckRecordsExecutor.scheduleWithFixedDelay(this::ackRecords0, 5L, 1L, TimeUnit.SECONDS);
    }

    @Override // org.apache.shardingsphere.scaling.core.common.channel.Channel
    public void pushRecord(Record record) throws InterruptedException {
        if (FinishedRecord.class.equals(record.getClass())) {
            for (int i = 0; i < this.channels.length; i++) {
                pushRecord(record, i);
            }
            return;
        }
        if (DataRecord.class.equals(record.getClass())) {
            pushRecord(record, Math.abs(record.hashCode() % this.channelNumber));
        } else {
            if (!PlaceholderRecord.class.equals(record.getClass())) {
                throw new RuntimeException("Not Support Record Type");
            }
            pushRecord(record, -1);
        }
    }

    private void pushRecord(Record record, int i) throws InterruptedException {
        this.toBeAckBitSetIndexes.add(Integer.valueOf(i));
        getBitSetChannel(Integer.valueOf(i)).pushRecord(record, this.indexAutoIncreaseGenerator.getAndIncrement());
    }

    @Override // org.apache.shardingsphere.scaling.core.common.channel.Channel
    public List<Record> fetchRecords(int i, int i2) {
        return findChannel().fetchRecords(i, i2);
    }

    @Override // org.apache.shardingsphere.scaling.core.common.channel.Channel
    public void ack() {
        findChannel().ack();
    }

    private synchronized void ackRecords0() {
        try {
            int shouldAckCount = shouldAckCount();
            if (0 == shouldAckCount) {
                return;
            }
            this.ackCallback.onAck(fetchAckRecords(shouldAckCount));
            this.lastAckIndex += shouldAckCount;
            for (BitSetChannel bitSetChannel : this.channels) {
                bitSetChannel.clear(this.lastAckIndex);
            }
        } catch (Exception e) {
            log.error("distribution channel auto ack failed.", e);
        }
    }

    private int shouldAckCount() {
        BitSet ackBitSet = this.autoAckChannel.getAckBitSet(this.lastAckIndex);
        for (BitSetChannel bitSetChannel : this.channels) {
            ackBitSet.or(bitSetChannel.getAckBitSet(this.lastAckIndex));
        }
        return ackBitSet.nextClearBit(0);
    }

    private List<Record> fetchAckRecords(int i) {
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < i; i2++) {
            linkedList.add(getBitSetChannel(this.toBeAckBitSetIndexes.remove()).removeAckRecord());
        }
        return linkedList;
    }

    private BitSetChannel getBitSetChannel(Integer num) {
        return num.intValue() == -1 ? this.autoAckChannel : this.channels[num.intValue()];
    }

    private BitSetChannel findChannel() {
        String l = Long.toString(Thread.currentThread().getId());
        checkAssignment(l);
        return this.channels[this.channelAssignment.get(l).intValue()];
    }

    private void checkAssignment(String str) {
        if (this.channelAssignment.containsKey(str)) {
            return;
        }
        synchronized (this) {
            if (!this.channelAssignment.containsKey(str)) {
                assignmentChannel(str);
            }
        }
    }

    private void assignmentChannel(String str) {
        for (int i = 0; i < this.channels.length; i++) {
            if (!this.channelAssignment.containsValue(Integer.valueOf(i))) {
                this.channelAssignment.put(str, Integer.valueOf(i));
                return;
            }
        }
    }

    @Override // org.apache.shardingsphere.scaling.core.common.channel.Channel
    public void close() {
        this.scheduleAckRecordsExecutor.shutdown();
        ackRecords0();
        for (BitSetChannel bitSetChannel : this.channels) {
            bitSetChannel.close();
        }
        this.toBeAckBitSetIndexes.clear();
    }
}
