package org.apache.seatunnel.connectors.seatunnel.sls.source;

import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.sls.serialization.FastLogDeserialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.class */
public class SlsSourceReader implements SourceReader<SeaTunnelRow, SlsSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(SlsSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500;
    private final SourceReader.Context context;
    private final SlsSourceConfig slsSourceConfig;
    private volatile boolean running = false;
    private final LinkedBlockingQueue<SlsSourceSplit> pendingShardsQueue = new LinkedBlockingQueue<>();
    private final Set<SlsSourceSplit> sourceSplits = new HashSet();
    private final Map<String, SlsConsumerThread> consumerThreadMap = new ConcurrentHashMap();
    private final ExecutorService executorService = Executors.newCachedThreadPool(runnable -> {
        return new Thread(runnable, "Sls Source Data Consumer");
    });
    private final Map<Long, Map<String, SlsSourceSplit>> checkpointOffsetMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlsSourceReader(SlsSourceConfig slsSourceConfig, SourceReader.Context context) {
        this.slsSourceConfig = slsSourceConfig;
        this.context = context;
    }

    public void open() throws Exception {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        if (!this.running) {
            Thread.sleep(THREAD_WAIT_TIME);
            return;
        }
        while (!this.pendingShardsQueue.isEmpty()) {
            this.sourceSplits.add(this.pendingShardsQueue.poll());
        }
        this.sourceSplits.forEach(slsSourceSplit -> {
            this.consumerThreadMap.computeIfAbsent(slsSourceSplit.splitId(), str -> {
                SlsConsumerThread slsConsumerThread = new SlsConsumerThread(this.slsSourceConfig);
                this.executorService.submit(slsConsumerThread);
                return slsConsumerThread;
            });
        });
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        FastLogDeserialization<SeaTunnelRow> deserializationSchema = this.slsSourceConfig.getConsumerMetaData().getDeserializationSchema();
        this.sourceSplits.forEach(slsSourceSplit2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.consumerThreadMap.get(slsSourceSplit2.splitId()).getTasks().put(client -> {
                    try {
                        PullLogsResponse pullLogs = client.pullLogs(new PullLogsRequest(slsSourceSplit2.getProject(), slsSourceSplit2.getLogStore(), slsSourceSplit2.getShardId().intValue(), slsSourceSplit2.getFetchSize().intValue(), slsSourceSplit2.getStartCursor()));
                        deserializationSchema.deserialize(pullLogs.getLogGroups(), collector);
                        slsSourceSplit2.setStartCursor(pullLogs.getNextCursor());
                        completableFuture.complete(true);
                        completableFuture.complete(false);
                    } catch (LogException e) {
                        e.printStackTrace();
                        completableFuture.completeExceptionally(e);
                        throw new RuntimeException(e);
                    } catch (IOException e2) {
                        e2.printStackTrace();
                        completableFuture.completeExceptionally(e2);
                        throw new RuntimeException(e2);
                    }
                });
                if (((Boolean) completableFuture.get()).booleanValue()) {
                    copyOnWriteArrayList.add(slsSourceSplit2);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                e2.printStackTrace();
                throw new RuntimeException(e2);
            }
        });
        if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            Iterator it = copyOnWriteArrayList.iterator();
            while (it.hasNext()) {
                ((SlsSourceSplit) it.next()).setFinish(true);
            }
            if (this.sourceSplits.stream().allMatch((v0) -> {
                return v0.isFinish();
            })) {
                log.info("sls batch mode finished");
                this.context.signalNoMoreElement();
            }
        }
    }

    public List<SlsSourceSplit> snapshotState(long j) throws Exception {
        this.checkpointOffsetMap.put(Long.valueOf(j), (Map) this.sourceSplits.stream().collect(Collectors.toMap((v0) -> {
            return v0.splitId();
        }, (v0) -> {
            return v0.copy();
        })));
        return (List) this.sourceSplits.stream().map((v0) -> {
            return v0.copy();
        }).collect(Collectors.toList());
    }

    public void addSplits(List<SlsSourceSplit> list) {
        this.running = true;
        list.forEach(slsSourceSplit -> {
            try {
                this.pendingShardsQueue.put(slsSourceSplit);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void handleNoMoreSplits() {
        log.info("receive no more splits message, this reader will not add new split.");
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.checkpointOffsetMap.containsKey(Long.valueOf(j))) {
            this.checkpointOffsetMap.remove(Long.valueOf(j)).forEach((str, slsSourceSplit) -> {
                try {
                    this.consumerThreadMap.get(str).getTasks().put(client -> {
                        try {
                            client.UpdateCheckPoint(slsSourceSplit.getProject(), slsSourceSplit.getLogStore(), slsSourceSplit.getConsumer(), slsSourceSplit.getShardId().intValue(), slsSourceSplit.getStartCursor());
                        } catch (LogException e) {
                            e.printStackTrace();
                            log.error("LogException: commit cursor to sls failed", e);
                            throw new RuntimeException(e);
                        }
                    });
                } catch (InterruptedException e) {
                    log.error("InterruptedException: commit cursor to sls failed", e);
                }
            });
        } else {
            log.warn("checkpoint {} do not exist or have already been committed.", Long.valueOf(j));
        }
    }
}
