package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWisePartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.class */
public class StreamingFileSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC = new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
    private final long bucketCheckInterval;
    private final BucketsBuilder<IN, ?> bucketsBuilder;
    private transient Buckets<IN, ?> buckets;
    private transient ProcessingTimeService processingTimeService;
    private transient ListState<byte[]> bucketStates;
    private transient ListState<Long> maxPartCountersState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$BucketsBuilder.class */
    public static abstract class BucketsBuilder<IN, BucketID> implements Serializable {
        private static final long serialVersionUID = 1;

        private BucketsBuilder() {
        }

        abstract Buckets<IN, BucketID> createBuckets(int i) throws IOException;
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$BulkFormatBuilder.class */
    public static class BulkFormatBuilder<IN, BucketID> extends BucketsBuilder<IN, BucketID> {
        private static final long serialVersionUID = 1;
        private final long bucketCheckInterval;
        private final Path basePath;
        private final BulkWriter.Factory<IN> writerFactory;
        private final BucketAssigner<IN, BucketID> bucketAssigner;
        private final BucketFactory<IN, BucketID> bucketFactory;

        BulkFormatBuilder(Path path, BulkWriter.Factory<IN> factory, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(path, factory, bucketAssigner, 60000L, new DefaultBucketFactoryImpl());
        }

        private BulkFormatBuilder(Path path, BulkWriter.Factory<IN> factory, BucketAssigner<IN, BucketID> bucketAssigner, long j, BucketFactory<IN, BucketID> bucketFactory) {
            super();
            this.basePath = (Path) Preconditions.checkNotNull(path);
            this.writerFactory = factory;
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            this.bucketCheckInterval = j;
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
        }

        public BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long j) {
            return new BulkFormatBuilder<>(this.basePath, this.writerFactory, this.bucketAssigner, j, this.bucketFactory);
        }

        public <ID> BulkFormatBuilder<IN, ID> withBucketAssigner(BucketAssigner<IN, ID> bucketAssigner) {
            return new BulkFormatBuilder<>(this.basePath, this.writerFactory, (BucketAssigner) Preconditions.checkNotNull(bucketAssigner), this.bucketCheckInterval, new DefaultBucketFactoryImpl());
        }

        @VisibleForTesting
        BulkFormatBuilder<IN, BucketID> withBucketFactory(BucketFactory<IN, BucketID> bucketFactory) {
            return new BulkFormatBuilder<>(this.basePath, this.writerFactory, this.bucketAssigner, this.bucketCheckInterval, (BucketFactory) Preconditions.checkNotNull(bucketFactory));
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink<>(this, this.bucketCheckInterval);
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder
        Buckets<IN, BucketID> createBuckets(int i) throws IOException {
            return new Buckets<>(this.basePath, this.bucketAssigner, this.bucketFactory, new BulkPartWriter.Factory(this.writerFactory), OnCheckpointRollingPolicy.build(), i);
        }
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$RowFormatBuilder.class */
    public static class RowFormatBuilder<IN, BucketID> extends BucketsBuilder<IN, BucketID> {
        private static final long serialVersionUID = 1;
        private final long bucketCheckInterval;
        private final Path basePath;
        private final Encoder<IN> encoder;
        private final BucketAssigner<IN, BucketID> bucketAssigner;
        private final RollingPolicy<IN, BucketID> rollingPolicy;
        private final BucketFactory<IN, BucketID> bucketFactory;

        RowFormatBuilder(Path path, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(path, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60000L, new DefaultBucketFactoryImpl());
        }

        private RowFormatBuilder(Path path, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner, RollingPolicy<IN, BucketID> rollingPolicy, long j, BucketFactory<IN, BucketID> bucketFactory) {
            super();
            this.basePath = (Path) Preconditions.checkNotNull(path);
            this.encoder = (Encoder) Preconditions.checkNotNull(encoder);
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
            this.bucketCheckInterval = j;
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withBucketCheckInterval(long j) {
            return new RowFormatBuilder<>(this.basePath, this.encoder, this.bucketAssigner, this.rollingPolicy, j, this.bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withBucketAssigner(BucketAssigner<IN, BucketID> bucketAssigner) {
            return new RowFormatBuilder<>(this.basePath, this.encoder, (BucketAssigner) Preconditions.checkNotNull(bucketAssigner), this.rollingPolicy, this.bucketCheckInterval, this.bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withRollingPolicy(RollingPolicy<IN, BucketID> rollingPolicy) {
            return new RowFormatBuilder<>(this.basePath, this.encoder, this.bucketAssigner, (RollingPolicy) Preconditions.checkNotNull(rollingPolicy), this.bucketCheckInterval, this.bucketFactory);
        }

        public <ID> RowFormatBuilder<IN, ID> withBucketAssignerAndPolicy(BucketAssigner<IN, ID> bucketAssigner, RollingPolicy<IN, ID> rollingPolicy) {
            return new RowFormatBuilder<>(this.basePath, this.encoder, (BucketAssigner) Preconditions.checkNotNull(bucketAssigner), (RollingPolicy) Preconditions.checkNotNull(rollingPolicy), this.bucketCheckInterval, new DefaultBucketFactoryImpl());
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink<>(this, this.bucketCheckInterval);
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder
        Buckets<IN, BucketID> createBuckets(int i) throws IOException {
            return new Buckets<>(this.basePath, this.bucketAssigner, this.bucketFactory, new RowWisePartWriter.Factory(this.encoder), this.rollingPolicy, i);
        }

        @VisibleForTesting
        RowFormatBuilder<IN, BucketID> withBucketFactory(BucketFactory<IN, BucketID> bucketFactory) {
            return new RowFormatBuilder<>(this.basePath, this.encoder, this.bucketAssigner, this.rollingPolicy, this.bucketCheckInterval, (BucketFactory) Preconditions.checkNotNull(bucketFactory));
        }
    }

    private StreamingFileSink(BucketsBuilder<IN, ?> bucketsBuilder, long j) {
        Preconditions.checkArgument(j > 0);
        this.bucketsBuilder = (BucketsBuilder) Preconditions.checkNotNull(bucketsBuilder);
        this.bucketCheckInterval = j;
    }

    public static <IN> RowFormatBuilder<IN, String> forRowFormat(Path path, Encoder<IN> encoder) {
        return new RowFormatBuilder<>(path, encoder, new DateTimeBucketAssigner());
    }

    public static <IN> BulkFormatBuilder<IN, String> forBulkFormat(Path path, BulkWriter.Factory<IN> factory) {
        return new BulkFormatBuilder<>(path, factory, new DateTimeBucketAssigner());
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.buckets = this.bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
        OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
        this.bucketStates = operatorStateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = operatorStateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        if (functionInitializationContext.isRestored()) {
            this.buckets.initializeState(this.bucketStates, this.maxPartCountersState);
        }
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.buckets.commitUpToCheckpoint(j);
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState((this.bucketStates == null || this.maxPartCountersState == null) ? false : true, "sink has not been initialized");
        this.buckets.snapshotState(functionSnapshotContext.getCheckpointId(), this.bucketStates, this.maxPartCountersState);
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
        this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval, this);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        this.buckets.onProcessingTime(currentProcessingTime);
        this.processingTimeService.registerTimer(currentProcessingTime + this.bucketCheckInterval, this);
    }

    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        this.buckets.onElement(in, context);
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        if (this.buckets != null) {
            this.buckets.close();
        }
    }
}
