package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.class */
public class DataStatisticsOperator extends AbstractStreamOperator<StatisticsOrRecord> implements OneInputStreamOperator<RowData, StatisticsOrRecord>, OperatorEventHandler {
    private static final long serialVersionUID = 1;
    private final String operatorName;
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final OperatorEventGateway operatorEventGateway;
    private final int downstreamParallelism;
    private final StatisticsType statisticsType;
    private final TypeSerializer<DataStatistics> taskStatisticsSerializer;
    private final TypeSerializer<GlobalStatistics> globalStatisticsSerializer;
    private transient int parallelism;
    private transient int subtaskIndex;
    private transient ListState<GlobalStatistics> globalStatisticsState;
    private volatile transient StatisticsType taskStatisticsType;
    private volatile transient DataStatistics localStatistics;
    private volatile transient GlobalStatistics globalStatistics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStatisticsOperator(String str, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, int i, StatisticsType statisticsType) {
        this.operatorName = str;
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.operatorEventGateway = operatorEventGateway;
        this.downstreamParallelism = i;
        this.statisticsType = statisticsType;
        SortKeySerializer sortKeySerializer = new SortKeySerializer(schema, sortOrder);
        this.taskStatisticsSerializer = new DataStatisticsSerializer(sortKeySerializer);
        this.globalStatisticsSerializer = new GlobalStatisticsSerializer(sortKeySerializer);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
        this.subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        this.globalStatisticsState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("globalStatisticsState", this.globalStatisticsSerializer));
        if (stateInitializationContext.isRestored()) {
            if (this.globalStatisticsState.get() == null || !((Iterable) this.globalStatisticsState.get()).iterator().hasNext()) {
                LOG.info("Operator {} subtask {} doesn't have global statistics state to restore", this.operatorName, Integer.valueOf(this.subtaskIndex));
            } else {
                GlobalStatistics globalStatistics = (GlobalStatistics) ((Iterable) this.globalStatisticsState.get()).iterator().next();
                LOG.info("Operator {} subtask {} restored global statistics state", this.operatorName, Integer.valueOf(this.subtaskIndex));
                this.globalStatistics = globalStatistics;
            }
            LOG.info("Operator {} subtask {} requests new global statistics from coordinator ", this.operatorName, Integer.valueOf(this.subtaskIndex));
            this.operatorEventGateway.sendEventToCoordinator(this.globalStatistics != null ? new RequestGlobalStatisticsEvent(this.globalStatistics.hashCode()) : new RequestGlobalStatisticsEvent());
        }
        this.taskStatisticsType = StatisticsUtil.collectType(this.statisticsType, this.globalStatistics);
        this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
    }

    public void open() throws Exception {
        if (this.globalStatistics != null) {
            this.output.collect(new StreamRecord(StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        Preconditions.checkArgument(operatorEvent instanceof StatisticsEvent, String.format("Operator %s subtask %s received unexpected operator event %s", this.operatorName, Integer.valueOf(this.subtaskIndex), operatorEvent.getClass()));
        StatisticsEvent statisticsEvent = (StatisticsEvent) operatorEvent;
        LOG.info("Operator {} subtask {} received global data event from coordinator checkpoint {}", new Object[]{this.operatorName, Integer.valueOf(this.subtaskIndex), Long.valueOf(statisticsEvent.checkpointId())});
        this.globalStatistics = StatisticsUtil.deserializeGlobalStatistics(statisticsEvent.statisticsBytes(), this.globalStatisticsSerializer);
        checkStatisticsTypeMigration();
        if (statisticsEvent.applyImmediately()) {
            this.output.collect(new StreamRecord(StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
    }

    public void processElement(StreamRecord<RowData> streamRecord) {
        RowData rowData = (RowData) streamRecord.getValue();
        this.sortKey.wrap(this.rowDataWrapper.wrap(rowData));
        this.localStatistics.add(this.sortKey);
        checkStatisticsTypeMigration();
        this.output.collect(new StreamRecord(StatisticsOrRecord.fromRecord(rowData)));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        long checkpointId = stateSnapshotContext.getCheckpointId();
        LOG.info("Operator {} subtask {} snapshotting data statistics for checkpoint {}", new Object[]{this.operatorName, Integer.valueOf(this.subtaskIndex), Long.valueOf(checkpointId)});
        if (this.globalStatistics != null) {
            this.output.collect(new StreamRecord(StatisticsOrRecord.fromStatistics(this.globalStatistics)));
        }
        if (this.globalStatistics != null && getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.globalStatisticsState.clear();
            LOG.info("Operator {} subtask {} saving global statistics to state", this.operatorName, Integer.valueOf(this.subtaskIndex));
            this.globalStatisticsState.add(this.globalStatistics);
            LOG.debug("Operator {} subtask {} saved global statistics to state: {}", new Object[]{this.operatorName, Integer.valueOf(this.subtaskIndex), this.globalStatistics});
        }
        LOG.info("Operator {} Subtask {} sending local statistics to coordinator for checkpoint {}", new Object[]{this.operatorName, Integer.valueOf(this.subtaskIndex), Long.valueOf(checkpointId)});
        this.operatorEventGateway.sendEventToCoordinator(StatisticsEvent.createTaskStatisticsEvent(checkpointId, this.localStatistics, this.taskStatisticsSerializer));
        this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
    }

    private void checkStatisticsTypeMigration() {
        if (this.statisticsType == StatisticsType.Auto && this.localStatistics.type() == StatisticsType.Map) {
            Map map = (Map) this.localStatistics.result();
            if (map.size() > 10000 || (this.globalStatistics != null && this.globalStatistics.type() == StatisticsType.Sketch)) {
                LOG.info("Operator {} subtask {} switched local statistics from Map to Sketch.", this.operatorName, Integer.valueOf(this.subtaskIndex));
                this.taskStatisticsType = StatisticsType.Sketch;
                this.localStatistics = StatisticsUtil.createTaskStatistics(this.taskStatisticsType, this.parallelism, this.downstreamParallelism);
                DataStatistics dataStatistics = this.localStatistics;
                Objects.requireNonNull(dataStatistics);
                SketchUtil.convertMapToSketch(map, dataStatistics::add);
            }
        }
    }

    @VisibleForTesting
    DataStatistics localStatistics() {
        return this.localStatistics;
    }

    @VisibleForTesting
    GlobalStatistics globalStatistics() {
        return this.globalStatistics;
    }
}
