package org.apache.seatunnel.translation.flink.source;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.class */
public class FlinkSourceReaderContext implements SourceReader.Context {
    private static final Logger log = LoggerFactory.getLogger(FlinkSourceReaderContext.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReaderContext.class);
    private final AtomicBoolean isSendNoMoreElementEvent = new AtomicBoolean(false);
    private final SourceReaderContext readerContext;
    private final SeaTunnelSource source;
    protected final EventListener eventListener;

    public FlinkSourceReaderContext(SourceReaderContext sourceReaderContext, SeaTunnelSource seaTunnelSource) {
        this.readerContext = sourceReaderContext;
        this.source = seaTunnelSource;
        this.eventListener = new DefaultEventProcessor(getFlinkJobId(sourceReaderContext));
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public int getIndexOfSubtask() {
        return this.readerContext.getIndexOfSubtask();
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public Boundedness getBoundedness() {
        return this.source.getBoundedness();
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public void signalNoMoreElement() {
        if (this.isSendNoMoreElementEvent.get()) {
            return;
        }
        LOGGER.info("Reader [{}] send no more element event to enumerator", Integer.valueOf(this.readerContext.getIndexOfSubtask()));
        this.isSendNoMoreElementEvent.compareAndSet(false, true);
        this.readerContext.sendSourceEventToCoordinator(new NoMoreElementEvent(this.readerContext.getIndexOfSubtask()));
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public void sendSplitRequest() {
        this.readerContext.sendSplitRequest();
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
        this.readerContext.sendSourceEventToCoordinator(new SourceEventWrapper(sourceEvent));
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public MetricsContext getMetricsContext() {
        return new FlinkMetricContext(getStreamingRuntimeContext(this.readerContext));
    }

    public boolean isSendNoMoreElementEvent() {
        return this.isSendNoMoreElementEvent.get();
    }

    @Override // org.apache.seatunnel.api.source.SourceReader.Context
    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(SourceReaderContext sourceReaderContext) {
        try {
            return getStreamingRuntimeContext(sourceReaderContext).getJobId().toString();
        } catch (Exception e) {
            log.warn("Get flink job id failed", e);
            return null;
        }
    }

    private static StreamingRuntimeContext getStreamingRuntimeContext(SourceReaderContext sourceReaderContext) {
        try {
            Field declaredField = sourceReaderContext.getClass().getDeclaredField("this$0");
            declaredField.setAccessible(true);
            return ((AbstractStreamOperator) declaredField.get(sourceReaderContext)).getRuntimeContext();
        } catch (Exception e) {
            throw new IllegalStateException("Initialize flink context failed", e);
        }
    }
}
