/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWisePartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class StreamingFileSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction,
CheckpointListener,
ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor("bucket-states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC = new ListStateDescriptor("max-part-counter", (TypeSerializer)LongSerializer.INSTANCE);
    private final long bucketCheckInterval;
    private final BucketsBuilder<IN, ?> bucketsBuilder;
    private transient Buckets<IN, ?> buckets;
    private transient ProcessingTimeService processingTimeService;
    private transient ListState<byte[]> bucketStates;
    private transient ListState<Long> maxPartCountersState;

    private StreamingFileSink(BucketsBuilder<IN, ?> bucketsBuilder, long bucketCheckInterval) {
        Preconditions.checkArgument((bucketCheckInterval > 0L ? 1 : 0) != 0);
        this.bucketsBuilder = (BucketsBuilder)Preconditions.checkNotNull(bucketsBuilder);
        this.bucketCheckInterval = bucketCheckInterval;
    }

    public static <IN> RowFormatBuilder<IN, String> forRowFormat(Path basePath, Encoder<IN> encoder) {
        return new RowFormatBuilder(basePath, encoder, new DateTimeBucketAssigner());
    }

    public static <IN> BulkFormatBuilder<IN, String> forBulkFormat(Path basePath, BulkWriter.Factory<IN> writerFactory) {
        return new BulkFormatBuilder(basePath, writerFactory, new DateTimeBucketAssigner());
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.buckets = this.bucketsBuilder.createBuckets(subtaskIndex);
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        if (context.isRestored()) {
            this.buckets.initializeState(this.bucketStates, this.maxPartCountersState);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.buckets.commitUpToCheckpoint(checkpointId);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.bucketStates != null && this.maxPartCountersState != null ? 1 : 0) != 0, (Object)"sink has not been initialized");
        this.buckets.snapshotState(context.getCheckpointId(), this.bucketStates, this.maxPartCountersState);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.processingTimeService = ((StreamingRuntimeContext)this.getRuntimeContext()).getProcessingTimeService();
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        this.processingTimeService.registerTimer(currentProcessingTime + this.bucketCheckInterval, this);
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        long currentTime = this.processingTimeService.getCurrentProcessingTime();
        this.buckets.onProcessingTime(currentTime);
        this.processingTimeService.registerTimer(currentTime + this.bucketCheckInterval, this);
    }

    @Override
    public void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.buckets.onElement(value, context);
    }

    public void close() throws Exception {
        if (this.buckets != null) {
            this.buckets.close();
        }
    }

    @PublicEvolving
    public static class BulkFormatBuilder<IN, BucketID>
    extends BucketsBuilder<IN, BucketID> {
        private static final long serialVersionUID = 1L;
        private final long bucketCheckInterval;
        private final Path basePath;
        private final BulkWriter.Factory<IN> writerFactory;
        private final BucketAssigner<IN, BucketID> bucketAssigner;
        private final BucketFactory<IN, BucketID> bucketFactory;

        BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
            this(basePath, writerFactory, assigner, 60000L, new DefaultBucketFactoryImpl());
        }

        private BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner, long bucketCheckInterval, BucketFactory<IN, BucketID> bucketFactory) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.writerFactory = writerFactory;
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.bucketCheckInterval = bucketCheckInterval;
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(bucketFactory);
        }

        public BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
            return new BulkFormatBuilder<IN, BucketID>(this.basePath, this.writerFactory, this.bucketAssigner, interval, this.bucketFactory);
        }

        public <ID> BulkFormatBuilder<IN, ID> withBucketAssigner(BucketAssigner<IN, ID> assigner) {
            return new BulkFormatBuilder(this.basePath, this.writerFactory, (BucketAssigner)Preconditions.checkNotNull(assigner), this.bucketCheckInterval, new DefaultBucketFactoryImpl());
        }

        @VisibleForTesting
        BulkFormatBuilder<IN, BucketID> withBucketFactory(BucketFactory<IN, BucketID> factory) {
            return new BulkFormatBuilder<IN, BucketID>(this.basePath, this.writerFactory, this.bucketAssigner, this.bucketCheckInterval, (BucketFactory)Preconditions.checkNotNull(factory));
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink(this, this.bucketCheckInterval);
        }

        @Override
        Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
            return new Buckets<IN, BucketID>(this.basePath, this.bucketAssigner, this.bucketFactory, new BulkPartWriter.Factory(this.writerFactory), OnCheckpointRollingPolicy.build(), subtaskIndex);
        }
    }

    @PublicEvolving
    public static class RowFormatBuilder<IN, BucketID>
    extends BucketsBuilder<IN, BucketID> {
        private static final long serialVersionUID = 1L;
        private final long bucketCheckInterval;
        private final Path basePath;
        private final Encoder<IN> encoder;
        private final BucketAssigner<IN, BucketID> bucketAssigner;
        private final RollingPolicy<IN, BucketID> rollingPolicy;
        private final BucketFactory<IN, BucketID> bucketFactory;

        RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60000L, new DefaultBucketFactoryImpl());
        }

        private RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> assigner, RollingPolicy<IN, BucketID> policy, long bucketCheckInterval, BucketFactory<IN, BucketID> bucketFactory) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.encoder = (Encoder)Preconditions.checkNotNull(encoder);
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            this.bucketCheckInterval = bucketCheckInterval;
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
            return new RowFormatBuilder<IN, BucketID>(this.basePath, this.encoder, this.bucketAssigner, this.rollingPolicy, interval, this.bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
            return new RowFormatBuilder<IN, BucketID>(this.basePath, this.encoder, (BucketAssigner)Preconditions.checkNotNull(assigner), this.rollingPolicy, this.bucketCheckInterval, this.bucketFactory);
        }

        public RowFormatBuilder<IN, BucketID> withRollingPolicy(RollingPolicy<IN, BucketID> policy) {
            return new RowFormatBuilder<IN, BucketID>(this.basePath, this.encoder, this.bucketAssigner, (RollingPolicy)Preconditions.checkNotNull(policy), this.bucketCheckInterval, this.bucketFactory);
        }

        public <ID> RowFormatBuilder<IN, ID> withBucketAssignerAndPolicy(BucketAssigner<IN, ID> assigner, RollingPolicy<IN, ID> policy) {
            return new RowFormatBuilder(this.basePath, this.encoder, (BucketAssigner)Preconditions.checkNotNull(assigner), (RollingPolicy)Preconditions.checkNotNull(policy), this.bucketCheckInterval, new DefaultBucketFactoryImpl());
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink(this, this.bucketCheckInterval);
        }

        @Override
        Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
            return new Buckets<IN, BucketID>(this.basePath, this.bucketAssigner, this.bucketFactory, new RowWisePartWriter.Factory(this.encoder), this.rollingPolicy, subtaskIndex);
        }

        @VisibleForTesting
        RowFormatBuilder<IN, BucketID> withBucketFactory(BucketFactory<IN, BucketID> factory) {
            return new RowFormatBuilder<IN, BucketID>(this.basePath, this.encoder, this.bucketAssigner, this.rollingPolicy, this.bucketCheckInterval, (BucketFactory)Preconditions.checkNotNull(factory));
        }
    }

    private static abstract class BucketsBuilder<IN, BucketID>
    implements Serializable {
        private static final long serialVersionUID = 1L;

        private BucketsBuilder() {
        }

        abstract Buckets<IN, BucketID> createBuckets(int var1) throws IOException;
    }
}

