package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.class */
public final class RetryStreamingExceptionHandler implements ExceptionHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RetryStreamingExceptionHandler.class);
    private final CDCClient cdcClient;
    private final int maxRetryTimes;
    private final int retryIntervalMills;
    private final AtomicInteger retryTimes = new AtomicInteger(0);

    public RetryStreamingExceptionHandler(CDCClient cDCClient, int i, int i2) {
        this.cdcClient = cDCClient;
        this.maxRetryTimes = i;
        this.retryIntervalMills = i2;
    }

    @Override // org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler
    public void handleException(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("Catch exception: ", th);
        reconnect(channelHandlerContext);
    }

    private void reconnect(ChannelHandlerContext channelHandlerContext) {
        try {
            this.retryTimes.incrementAndGet();
            ClientConnectionContext clientConnectionContext = (ClientConnectionContext) channelHandlerContext.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
            if (this.retryTimes.get() > this.maxRetryTimes) {
                log.warn("Stop try to reconnect, stop streaming ids: {}", clientConnectionContext.getStreamingIds());
                clientConnectionContext.getStreamingIds().forEach(str -> {
                    CompletableFuture.runAsync(() -> {
                        this.cdcClient.stopStreaming(str);
                    });
                });
            } else {
                TimeUnit.MILLISECONDS.sleep(this.retryIntervalMills);
                log.info("Retry to restart streaming, retry times: {}", Integer.valueOf(this.retryTimes.get()));
                clientConnectionContext.getStreamingIds().forEach(str2 -> {
                    CompletableFuture.runAsync(() -> {
                        this.cdcClient.restartStreaming(str2);
                    });
                });
            }
        } catch (InterruptedException e) {
            throw e;
        }
    }
}
