package org.tikv.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.flink.shaded.guava31.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Range;
import org.apache.flink.shaded.guava31.com.google.common.collect.TreeMultiset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiSession;
import org.tikv.common.key.Key;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb;

/* loaded from: input_file:org/tikv/cdc/CDCClient.class */
public class CDCClient implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCClient.class);
    private final TiSession session;
    private final Coprocessor.KeyRange keyRange;
    private final CDCConfig config;
    private final BlockingQueue<CDCEvent> eventsBuffer;
    private final ConcurrentHashMap<Long, RegionCDCClient> regionClients;
    private final Map<Long, Long> regionToResolvedTs;
    private final TreeMultiset<Long> resolvedTsSet;
    private boolean started;
    private Consumer<CDCEvent> eventConsumer;

    public CDCClient(TiSession tiSession, Coprocessor.KeyRange keyRange) {
        this(tiSession, keyRange, new CDCConfig());
    }

    public CDCClient(TiSession tiSession, Coprocessor.KeyRange keyRange, CDCConfig cDCConfig) {
        this.regionClients = new ConcurrentHashMap<>();
        this.regionToResolvedTs = new HashMap();
        this.resolvedTsSet = TreeMultiset.create();
        this.started = false;
        Preconditions.checkState(tiSession.getConf().getIsolationLevel().equals(Kvrpcpb.IsolationLevel.SI), "Unsupported Isolation Level");
        this.session = tiSession;
        this.keyRange = keyRange;
        this.config = cDCConfig;
        this.eventsBuffer = new LinkedBlockingQueue(cDCConfig.getEventBufferSize());
        this.eventConsumer = cDCEvent -> {
            for (int i = 0; i < 2; i++) {
                if (this.eventsBuffer.offer(cDCEvent)) {
                    return;
                }
            }
            try {
                this.eventsBuffer.put(cDCEvent);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
    }

    public synchronized void start(long j) {
        Preconditions.checkState(!this.started, "Client is already started");
        applyKeyRange(this.keyRange, j);
        this.started = true;
    }

    public synchronized Cdcpb.Event.Row get() throws InterruptedException {
        CDCEvent poll = this.eventsBuffer.poll();
        if (poll == null) {
            return null;
        }
        switch (poll.eventType) {
            case ROW:
                return poll.row;
            case RESOLVED_TS:
                handleResolvedTs(poll.regionId, poll.resolvedTs);
                return null;
            case ERROR:
                handleErrorEvent(poll.regionId, poll.error, poll.resolvedTs);
                return null;
            default:
                return null;
        }
    }

    public synchronized long getMinResolvedTs() {
        return ((Long) this.resolvedTsSet.firstEntry().getElement()).longValue();
    }

    public synchronized long getMaxResolvedTs() {
        return ((Long) this.resolvedTsSet.lastEntry().getElement()).longValue();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        removeRegions(this.regionClients.keySet());
    }

    private synchronized void applyKeyRange(Coprocessor.KeyRange keyRange, long j) {
        Iterator it = RangeSplitter.newSplitter(this.session.getRegionManager()).splitRangeByRegion(Arrays.asList(keyRange)).stream().map((v0) -> {
            return v0.getRegion();
        }).sorted((tiRegion, tiRegion2) -> {
            return Long.compare(tiRegion.getId(), tiRegion2.getId());
        }).iterator();
        Iterator<RegionCDCClient> it2 = this.regionClients.values().iterator();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        TiRegion tiRegion3 = it.hasNext() ? (TiRegion) it.next() : null;
        RegionCDCClient next = it2.hasNext() ? it2.next() : null;
        while (tiRegion3 != null && next != null) {
            if (tiRegion3.getId() == next.getRegion().getId()) {
                if (!next.isRunning()) {
                    arrayList2.add(Long.valueOf(tiRegion3.getId()));
                    arrayList.add(tiRegion3);
                }
                tiRegion3 = it.hasNext() ? (TiRegion) it.next() : null;
                next = it2.hasNext() ? it2.next() : null;
            } else if (tiRegion3.getId() < next.getRegion().getId()) {
                arrayList.add(tiRegion3);
                tiRegion3 = it.hasNext() ? (TiRegion) it.next() : null;
            } else {
                arrayList2.add(Long.valueOf(next.getRegion().getId()));
                next = it2.hasNext() ? it2.next() : null;
            }
        }
        while (tiRegion3 != null) {
            arrayList.add(tiRegion3);
            tiRegion3 = it.hasNext() ? (TiRegion) it.next() : null;
        }
        while (next != null) {
            arrayList2.add(Long.valueOf(next.getRegion().getId()));
            next = it2.hasNext() ? it2.next() : null;
        }
        removeRegions(arrayList2);
        addRegions(arrayList, j);
        LOGGER.info("keyRange applied");
    }

    private synchronized void addRegions(Iterable<TiRegion> iterable, long j) {
        LOGGER.info("add regions: {}, timestamp: {}", iterable, Long.valueOf(j));
        for (TiRegion tiRegion : iterable) {
            if (overlapWithRegion(tiRegion)) {
                try {
                    RegionCDCClient regionCDCClient = new RegionCDCClient(tiRegion, this.keyRange, this.session.getChannelFactory().getChannel(this.session.getRegionManager().getStoreById(tiRegion.getLeader().getStoreId()).getStore().getAddress(), this.session.getPDClient().getHostMapping()), this.eventConsumer, this.config);
                    this.regionClients.put(Long.valueOf(tiRegion.getId()), regionCDCClient);
                    this.regionToResolvedTs.put(Long.valueOf(tiRegion.getId()), Long.valueOf(j));
                    this.resolvedTsSet.add(Long.valueOf(j));
                    regionCDCClient.start(j);
                } catch (Exception e) {
                    LOGGER.error("failed to add region(regionId: {}, reason: {})", Long.valueOf(tiRegion.getId()), e);
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private synchronized void removeRegions(Iterable<Long> iterable) {
        LOGGER.info("remove regions: {}", iterable);
        Iterator<Long> it = iterable.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            RegionCDCClient remove = this.regionClients.remove(Long.valueOf(longValue));
            if (remove != null) {
                try {
                    try {
                        remove.close();
                        this.resolvedTsSet.remove(this.regionToResolvedTs.remove(Long.valueOf(longValue)));
                        this.regionToResolvedTs.remove(Long.valueOf(longValue));
                    } catch (Exception e) {
                        LOGGER.error("failed to close region client, region id: {}, error: {}", Long.valueOf(longValue), e);
                        this.resolvedTsSet.remove(this.regionToResolvedTs.remove(Long.valueOf(longValue)));
                        this.regionToResolvedTs.remove(Long.valueOf(longValue));
                    }
                } catch (Throwable th) {
                    this.resolvedTsSet.remove(this.regionToResolvedTs.remove(Long.valueOf(longValue)));
                    this.regionToResolvedTs.remove(Long.valueOf(longValue));
                    throw th;
                }
            }
        }
    }

    private boolean overlapWithRegion(TiRegion tiRegion) {
        return !Range.closedOpen(Key.toRawKey(tiRegion.getStartKey()), Key.toRawKey(tiRegion.getEndKey())).intersection(Range.closedOpen(Key.toRawKey(this.keyRange.getStart()), Key.toRawKey(this.keyRange.getEnd()))).isEmpty();
    }

    private void handleResolvedTs(long j, long j2) {
        LOGGER.info("handle resolvedTs: {}, regionId: {}", Long.valueOf(j2), Long.valueOf(j));
        this.resolvedTsSet.remove(this.regionToResolvedTs.replace(Long.valueOf(j), Long.valueOf(j2)));
        this.resolvedTsSet.add(Long.valueOf(j2));
    }

    public void handleErrorEvent(long j, Throwable th, long j2) {
        LOGGER.info("handle error: {}, regionId: {}", th, Long.valueOf(j));
        this.session.getRegionManager().onRequestFail(this.regionClients.get(Long.valueOf(j)).getRegion());
        removeRegions(Arrays.asList(Long.valueOf(j)));
        applyKeyRange(this.keyRange, j2);
    }
}
