package org.apache.fluo.recipes.core.export;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.api.observer.ObserverProvider;
import org.apache.fluo.recipes.core.common.TableOptimizations;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;

/* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue.class */
public class ExportQueue<K, V> {
    static final String RANGE_BEGIN = "#";
    static final String RANGE_END = ":~";
    private int numBuckets;
    private SimpleSerializer serializer;
    private String queueId;
    private FluentConfigurator opts;

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$FluentArg1.class */
    public interface FluentArg1 {
        FluentArg2 keyType(String str);

        FluentArg2 keyType(Class<?> cls);
    }

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$FluentArg2.class */
    public interface FluentArg2 {
        FluentArg3 valueType(String str);

        FluentArg3 valueType(Class<?> cls);
    }

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$FluentArg3.class */
    public interface FluentArg3 {
        FluentOptions buckets(int i);
    }

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$FluentOptions.class */
    public interface FluentOptions {
        FluentOptions bufferSize(long j);

        FluentOptions bucketsPerTablet(int i);

        void save(FluoConfiguration fluoConfiguration);
    }

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$Optimizer.class */
    public static class Optimizer implements TableOptimizations.TableOptimizationsFactory {
        @Override // org.apache.fluo.recipes.core.common.TableOptimizations.TableOptimizationsFactory
        public TableOptimizations getTableOptimizations(String str, SimpleConfiguration simpleConfiguration) {
            FluentConfigurator load = FluentConfigurator.load(str, simpleConfiguration);
            ArrayList arrayList = new ArrayList();
            Bytes of = Bytes.of(load.queueId + ExportQueue.RANGE_BEGIN);
            Bytes of2 = Bytes.of(load.queueId + ExportQueue.RANGE_END);
            arrayList.add(of);
            arrayList.add(of2);
            ArrayList arrayList2 = new ArrayList();
            int bucketsPerTablet = load.getBucketsPerTablet();
            while (true) {
                int i = bucketsPerTablet;
                if (i >= load.buckets) {
                    Collections.sort(arrayList2);
                    arrayList.addAll(arrayList2);
                    TableOptimizations tableOptimizations = new TableOptimizations();
                    tableOptimizations.setSplits(arrayList);
                    tableOptimizations.setTabletGroupingRegex(Pattern.quote(str + ":"));
                    return tableOptimizations;
                }
                arrayList2.add(ExportBucket.generateBucketRow(load.queueId, i, load.buckets));
                bucketsPerTablet = i + load.getBucketsPerTablet();
            }
        }
    }

    /* loaded from: input_file:org/apache/fluo/recipes/core/export/ExportQueue$Options.class */
    public static class Options {
        private static final String PREFIX = "recipes.exportQueue.";
        FluentConfigurator fluentCfg;
        SimpleConfiguration exporterConfig;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Options(String str, SimpleConfiguration simpleConfiguration) {
            this.fluentCfg = FluentConfigurator.load(str, simpleConfiguration);
            this.exporterConfig = simpleConfiguration.subset(PREFIX + str + ".exporterCfg");
        }

        public Options(String str, String str2, String str3, String str4, int i) {
            this(str, str3, str4, i);
            this.fluentCfg.exporterType = (String) Objects.requireNonNull(str2);
        }

        public <K, V> Options(String str, Class<? extends Exporter<K, V>> cls, Class<K> cls2, Class<V> cls3, int i) {
            this(str, cls.getName(), cls2.getName(), cls3.getName(), i);
        }

        Options(String str, String str2, String str3, int i) {
            Preconditions.checkArgument(i > 0);
            this.fluentCfg = (FluentConfigurator) new FluentConfigurator(str).keyType(str2).valueType(str3).buckets(i);
        }

        public Options setBufferSize(long j) {
            this.fluentCfg.bufferSize(j);
            return this;
        }

        long getBufferSize() {
            return this.fluentCfg.getBufferSize();
        }

        public Options setBucketsPerTablet(int i) {
            this.fluentCfg.bucketsPerTablet(i);
            return this;
        }

        int getBucketsPerTablet() {
            return this.fluentCfg.getBucketsPerTablet();
        }

        public Options setExporterConfiguration(SimpleConfiguration simpleConfiguration) {
            Objects.requireNonNull(simpleConfiguration);
            this.exporterConfig = simpleConfiguration;
            return this;
        }

        public SimpleConfiguration getExporterConfiguration() {
            return this.exporterConfig == null ? new SimpleConfiguration() : this.exporterConfig;
        }

        public String getQueueId() {
            return this.fluentCfg.queueId;
        }

        void save(SimpleConfiguration simpleConfiguration) {
            this.fluentCfg.save(simpleConfiguration);
            if (this.exporterConfig != null) {
                Iterator keys = this.exporterConfig.getKeys();
                while (keys.hasNext()) {
                    String str = (String) keys.next();
                    simpleConfiguration.setProperty(PREFIX + this.fluentCfg.queueId + ".exporterCfg." + str, this.exporterConfig.getRawString(str));
                }
            }
        }
    }

    ExportQueue(FluentConfigurator fluentConfigurator, SimpleSerializer simpleSerializer) throws Exception {
        this.queueId = fluentConfigurator.queueId;
        this.numBuckets = fluentConfigurator.buckets;
        this.serializer = simpleSerializer;
        this.opts = fluentConfigurator;
    }

    public void add(TransactionBase transactionBase, K k, V v) {
        addAll(transactionBase, Collections.singleton(new Export(k, v)).iterator());
    }

    public void addAll(TransactionBase transactionBase, Iterator<Export<K, V>> it) {
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            Export<K, V> next = it.next();
            byte[] serialize = this.serializer.serialize(next.getKey());
            byte[] serialize2 = this.serializer.serialize(next.getValue());
            int abs = Math.abs(Hashing.murmur3_32().hashBytes(serialize).asInt() % this.numBuckets);
            ExportBucket exportBucket = new ExportBucket(transactionBase, this.queueId, abs, this.numBuckets);
            exportBucket.add(transactionBase.getStartTimestamp(), serialize, serialize2);
            if (!hashSet.contains(Integer.valueOf(abs))) {
                exportBucket.notifyExportObserver();
                hashSet.add(Integer.valueOf(abs));
            }
        }
    }

    public static <K2, V2> ExportQueue<K2, V2> getInstance(String str, SimpleConfiguration simpleConfiguration) {
        try {
            return new ExportQueue<>(FluentConfigurator.load(str, simpleConfiguration), SimpleSerializer.getInstance(simpleConfiguration));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static FluentArg1 configure(String str) {
        return new FluentConfigurator((String) Objects.requireNonNull(str));
    }

    @Deprecated
    public static void configure(FluoConfiguration fluoConfiguration, Options options) {
        options.save(fluoConfiguration.getAppConfiguration());
        fluoConfiguration.addObserver(new ObserverSpecification(ExportObserver.class.getName(), Collections.singletonMap("queueId", options.fluentCfg.queueId)));
    }

    public void registerObserver(ObserverProvider.Registry registry, org.apache.fluo.recipes.core.export.function.Exporter<K, V> exporter) {
        Preconditions.checkState(this.opts.exporterType == null, "Expected exporter type not be set, it was set to %s.  Cannot not use the old and new way of configuring exporters at the same time.", new Object[]{this.opts.exporterType});
        try {
            registry.forColumn(ExportBucket.newNotificationColumn(this.queueId), Observer.NotificationType.WEAK).withId("exportq-" + this.queueId).useObserver(new ExportObserverImpl(this.queueId, this.opts, this.serializer, exporter));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
