package org.apache.seatunnel.translation.flink.source;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/FlinkSourceSplitEnumeratorContext.class */
public class FlinkSourceSplitEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
    private static final Logger log = LoggerFactory.getLogger(FlinkSourceSplitEnumeratorContext.class);
    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext;
    protected final EventListener eventListener;

    public FlinkSourceSplitEnumeratorContext(SplitEnumeratorContext<SplitWrapper<SplitT>> splitEnumeratorContext) {
        this.enumContext = splitEnumeratorContext;
        this.eventListener = new DefaultEventProcessor(getFlinkJobId(splitEnumeratorContext));
    }

    public int currentParallelism() {
        return this.enumContext.currentParallelism();
    }

    public Set<Integer> registeredReaders() {
        return this.enumContext.registeredReaders().keySet();
    }

    public void assignSplit(int i, List<SplitT> list) {
        list.forEach(sourceSplit -> {
            this.enumContext.assignSplit(new SplitWrapper(sourceSplit), i);
        });
    }

    public void signalNoMoreSplits(int i) {
        this.enumContext.signalNoMoreSplits(i);
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        this.enumContext.sendEventToSourceReader(i, new SourceEventWrapper(sourceEvent));
    }

    public MetricsContext getMetricsContext() {
        return new AbstractMetricsContext() { // from class: org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.1
        };
    }

    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(SplitEnumeratorContext splitEnumeratorContext) {
        try {
            return getJobIdForV15(splitEnumeratorContext);
        } catch (Exception e) {
            log.warn("Get flink job id failed", e);
            return null;
        }
    }

    private static String getJobIdForV15(SplitEnumeratorContext splitEnumeratorContext) {
        try {
            SourceCoordinatorContext sourceCoordinatorContext = (SourceCoordinatorContext) splitEnumeratorContext;
            Field declaredField = sourceCoordinatorContext.getClass().getDeclaredField("operatorCoordinatorContext");
            declaredField.setAccessible(true);
            OperatorCoordinator.Context context = (OperatorCoordinator.Context) declaredField.get(sourceCoordinatorContext);
            Field[] declaredFields = context.getClass().getDeclaredFields();
            if (!Arrays.stream(declaredFields).filter(field -> {
                return field.getName().equals("globalFailureHandler");
            }).findFirst().isPresent()) {
                Field field2 = (Field) Arrays.stream(declaredFields).filter(field3 -> {
                    return field3.getName().equals("context");
                }).findFirst().get();
                field2.setAccessible(true);
                context = (OperatorCoordinator.Context) field2.get(context);
            }
            Field field4 = (Field) Arrays.stream(context.getClass().getDeclaredFields()).filter(field5 -> {
                return field5.getName().equals("globalFailureHandler");
            }).findFirst().get();
            field4.setAccessible(true);
            Object obj = field4.get(context);
            Field field6 = (Field) Arrays.stream(obj.getClass().getDeclaredFields()).filter(field7 -> {
                return field7.getName().equals("arg$1");
            }).findFirst().get();
            field6.setAccessible(true);
            return ((SchedulerBase) field6.get(obj)).getExecutionGraph().getJobID().toString();
        } catch (Exception e) {
            throw new IllegalStateException("Initialize flink job-id failed", e);
        }
    }
}
