package org.apache.seatunnel.translation.flink.sink;

import java.lang.reflect.Field;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.DefaultEventProcessor;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.class */
public class FlinkSinkWriterContext implements SinkWriter.Context {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkWriterContext.class);
    private final Sink.InitContext writerContext;
    private final EventListener eventListener;

    public FlinkSinkWriterContext(Sink.InitContext initContext) {
        this.writerContext = initContext;
        this.eventListener = new DefaultEventProcessor(getFlinkJobId(initContext));
    }

    @Override // org.apache.seatunnel.api.sink.SinkWriter.Context
    public int getIndexOfSubtask() {
        return this.writerContext.getSubtaskId();
    }

    @Override // org.apache.seatunnel.api.sink.SinkWriter.Context
    public MetricsContext getMetricsContext() {
        return new FlinkMetricContext(getStreamingRuntimeContextForV15(this.writerContext));
    }

    @Override // org.apache.seatunnel.api.sink.SinkWriter.Context
    public EventListener getEventListener() {
        return this.eventListener;
    }

    private static String getFlinkJobId(Sink.InitContext initContext) {
        try {
            return getStreamingRuntimeContextForV15(initContext).getJobId().toString();
        } catch (Exception e) {
            log.warn("Get flink job id failed", e);
            return null;
        }
    }

    private static StreamingRuntimeContext getStreamingRuntimeContextForV15(Sink.InitContext initContext) {
        try {
            Field declaredField = initContext.getClass().getDeclaredField("context");
            declaredField.setAccessible(true);
            Object obj = declaredField.get(initContext);
            Field declaredField2 = obj.getClass().getDeclaredField("runtimeContext");
            declaredField2.setAccessible(true);
            return (StreamingRuntimeContext) declaredField2.get(obj);
        } catch (Exception e) {
            throw new IllegalStateException("Initialize flink context failed", e);
        }
    }
}
