package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/* loaded from: input_file:org/apache/iceberg/flink/maintenance/api/TableMaintenance.class */
public class TableMaintenance {
    static final String SOURCE_OPERATOR_NAME_PREFIX = "Monitor source for ";
    static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager";
    static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner";
    static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
    static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover";

    /* loaded from: input_file:org/apache/iceberg/flink/maintenance/api/TableMaintenance$Builder.class */
    public static class Builder {
        private final StreamExecutionEnvironment env;
        private final DataStream<TableChange> inputStream;
        private final TableLoader tableLoader;
        private final TriggerLockFactory lockFactory;
        private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
        private String slotSharingGroup = "default";
        private Duration rateLimit = Duration.ofMinutes(1);
        private Duration lockCheckDelay = Duration.ofSeconds(30);
        private int parallelism = -1;
        private int maxReadBack = 100;
        private final List<MaintenanceTaskBuilder<?>> taskBuilders = Lists.newArrayListWithCapacity(4);

        private Builder(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<TableChange> dataStream, TableLoader tableLoader, TriggerLockFactory triggerLockFactory) {
            this.env = streamExecutionEnvironment;
            this.inputStream = dataStream;
            this.tableLoader = tableLoader;
            this.lockFactory = triggerLockFactory;
        }

        public Builder uidSuffix(String str) {
            this.uidSuffix = str;
            return this;
        }

        public Builder slotSharingGroup(String str) {
            this.slotSharingGroup = str;
            return this;
        }

        public Builder rateLimit(Duration duration) {
            Preconditions.checkNotNull(Boolean.valueOf(this.rateLimit.toMillis() > 0), "Rate limit should be greater than 0");
            this.rateLimit = duration;
            return this;
        }

        public Builder lockCheckDelay(Duration duration) {
            this.lockCheckDelay = duration;
            return this;
        }

        public Builder parallelism(int i) {
            OperatorValidationUtils.validateParallelism(i);
            this.parallelism = i;
            return this;
        }

        public Builder maxReadBack(int i) {
            Preconditions.checkArgument(this.inputStream == null, "Can't set maxReadBack when change stream is provided");
            this.maxReadBack = i;
            return this;
        }

        public Builder add(MaintenanceTaskBuilder<?> maintenanceTaskBuilder) {
            this.taskBuilders.add(maintenanceTaskBuilder);
            return this;
        }

        public void append() throws IOException {
            Preconditions.checkArgument(!this.taskBuilders.isEmpty(), "Provide at least one task");
            Preconditions.checkNotNull(this.uidSuffix, "Uid suffix should no be null");
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.taskBuilders.size());
            ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(this.taskBuilders.size());
            for (int i = 0; i < this.taskBuilders.size(); i++) {
                newArrayListWithCapacity.add(nameFor(this.taskBuilders.get(i), i));
                newArrayListWithCapacity2.add(this.taskBuilders.get(i).evaluator());
            }
            TableLoader m581clone = this.tableLoader.m581clone();
            try {
                m581clone.open();
                String name = m581clone.loadTable().name();
                SingleOutputStreamOperator forceNonParallel = DataStreamUtils.reinterpretAsKeyedStream(changeStream(name, m581clone), tableChange -> {
                    return true;
                }).process(new TriggerManager(m581clone, this.lockFactory, newArrayListWithCapacity, newArrayListWithCapacity2, this.rateLimit.toMillis(), this.lockCheckDelay.toMillis())).name(TableMaintenance.TRIGGER_MANAGER_OPERATOR_NAME).uid("Trigger manager" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel().assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()).name(TableMaintenance.WATERMARK_ASSIGNER_OPERATOR_NAME).uid("Watermark Assigner" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel();
                DataStream<TaskResult> dataStream = null;
                for (int i2 = 0; i2 < this.taskBuilders.size(); i2++) {
                    int i3 = i2;
                    DataStream<TaskResult> append = this.taskBuilders.get(i3).append(forceNonParallel.filter(trigger -> {
                        return trigger.taskId() != null && trigger.taskId().intValue() == i3;
                    }).name("Filter " + i3).forceNonParallel().uid("Filter " + i3 + "-" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup), i3, (String) newArrayListWithCapacity.get(i3), name, m581clone, this.uidSuffix, this.slotSharingGroup, this.parallelism);
                    dataStream = dataStream == null ? append : dataStream.union(new DataStream[]{append});
                }
                dataStream.transform(TableMaintenance.LOCK_REMOVER_OPERATOR_NAME, TypeInformation.of(Void.class), new LockRemover(name, this.lockFactory, newArrayListWithCapacity)).forceNonParallel().uid("lock-remover-" + this.uidSuffix).slotSharingGroup(this.slotSharingGroup);
                if (m581clone != null) {
                    m581clone.close();
                }
            } catch (Throwable th) {
                if (m581clone != null) {
                    try {
                        m581clone.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        private DataStream<TableChange> changeStream(String str, TableLoader tableLoader) {
            if (this.inputStream != null) {
                return this.inputStream.global();
            }
            return this.env.fromSource(new MonitorSource(tableLoader, RateLimiterStrategy.perSecond(1.0d / this.rateLimit.getSeconds()), this.maxReadBack), WatermarkStrategy.noWatermarks(), "Monitor source for " + str).uid("Monitor source for " + this.uidSuffix).slotSharingGroup(this.slotSharingGroup).forceNonParallel();
        }

        private static String nameFor(MaintenanceTaskBuilder<?> maintenanceTaskBuilder, int i) {
            return String.format("%s [%s]", maintenanceTaskBuilder.getClass().getSimpleName(), String.valueOf(i));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 132293286:
                    if (implMethodName.equals("lambda$append$29ae9714$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 1490935936:
                    if (implMethodName.equals("lambda$append$2d1eeef$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/maintenance/api/TableMaintenance$Builder") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/iceberg/flink/maintenance/operator/TableChange;)Ljava/lang/Boolean;")) {
                        return tableChange -> {
                            return true;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/iceberg/flink/maintenance/api/TableMaintenance$Builder") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/iceberg/flink/maintenance/api/Trigger;)Z")) {
                        int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                        return trigger -> {
                            return trigger.taskId() != null && trigger.taskId().intValue() == intValue;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/iceberg/flink/maintenance/api/TableMaintenance$PunctuatedWatermarkStrategy.class */
    public static class PunctuatedWatermarkStrategy implements WatermarkStrategy<Trigger> {
        public WatermarkGenerator<Trigger> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Trigger>() { // from class: org.apache.iceberg.flink.maintenance.api.TableMaintenance.PunctuatedWatermarkStrategy.1
                public void onEvent(Trigger trigger, long j, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(trigger.timestamp()));
                }

                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                }
            };
        }

        public TimestampAssigner<Trigger> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (trigger, j) -> {
                return trigger.timestamp();
            };
        }
    }

    private TableMaintenance() {
    }

    @Internal
    public static Builder forChangeStream(DataStream<TableChange> dataStream, TableLoader tableLoader, TriggerLockFactory triggerLockFactory) {
        Preconditions.checkNotNull(dataStream, "The change stream should not be null");
        Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
        Preconditions.checkNotNull(triggerLockFactory, "LockFactory should not be null");
        return new Builder(null, dataStream, tableLoader, triggerLockFactory);
    }

    public static Builder forTable(StreamExecutionEnvironment streamExecutionEnvironment, TableLoader tableLoader, TriggerLockFactory triggerLockFactory) {
        Preconditions.checkNotNull(streamExecutionEnvironment, "StreamExecutionEnvironment should not be null");
        Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
        Preconditions.checkNotNull(triggerLockFactory, "LockFactory should not be null");
        return new Builder(streamExecutionEnvironment, null, tableLoader, triggerLockFactory);
    }
}
