package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.netlet.util.Slice;
import java.util.Arrays;
import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
import org.apache.apex.malhar.lib.state.managed.MovingBoundaryTimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.class */
public class BoundedDedupOperator extends AbstractDeduper<Object> {
    private static final long DEFAULT_CONSTANT_TIME = 0;
    private static final int DEFAULT_NUM_BUCKETS = 46340;

    @NotNull
    private String keyExpression;
    private transient Class<?> pojoClass;
    private transient PojoUtils.Getter<Object, Object> keyGetter;
    private transient StreamCodec<Object> streamCodec;
    private int numBuckets = DEFAULT_NUM_BUCKETS;

    @InputPortFieldAnnotation(schemaRequired = true)
    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.dedup.BoundedDedupOperator.1
        public void setup(Context.PortContext portContext) {
            BoundedDedupOperator.this.pojoClass = (Class) portContext.getAttributes().get(Context.PortContext.TUPLE_CLASS);
            BoundedDedupOperator.this.streamCodec = BoundedDedupOperator.this.getDeduperStreamCodec();
        }

        public void process(Object obj) {
            BoundedDedupOperator.this.processTuple(obj);
        }

        public StreamCodec<Object> getStreamCodec() {
            return BoundedDedupOperator.this.streamCodec;
        }
    };

    public BoundedDedupOperator() {
        this.managedState = new ManagedTimeStateImpl();
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    public void setup(Context.OperatorContext operatorContext) {
        if (this.numBuckets == 0) {
            this.numBuckets = DEFAULT_NUM_BUCKETS;
        }
        ((ManagedTimeStateImpl) this.managedState).setNumBuckets(this.numBuckets);
        this.managedState.setTimeBucketAssigner(new MovingBoundaryTimeBucketAssigner());
        super.setup(operatorContext);
    }

    public void activate(Context context) {
        this.keyGetter = PojoUtils.createGetter(this.pojoClass, this.keyExpression, Object.class);
    }

    public void deactivate() {
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected long getTime(Object obj) {
        return DEFAULT_CONSTANT_TIME;
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected Slice getKey(Object obj) {
        return this.streamCodec.toByteArray(this.keyGetter.get(obj));
    }

    protected StreamCodec<Object> getDeduperStreamCodec() {
        return new DeduperStreamCodec(this.keyExpression);
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected Future<Slice> getAsyncManagedState(Object obj) {
        return ((ManagedTimeStateImpl) this.managedState).getAsync(getBucketId(r0), getKey(obj));
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected void putManagedState(Object obj) {
        ((ManagedTimeStateImpl) this.managedState).put(getBucketId(r0), DEFAULT_CONSTANT_TIME, getKey(obj), new Slice(new byte[0]));
    }

    protected int getBucketId(Slice slice) {
        return Arrays.hashCode(slice.buffer) % this.numBuckets;
    }

    public String getKeyExpression() {
        return this.keyExpression;
    }

    public void setKeyExpression(String str) {
        this.keyExpression = str;
    }

    public int getNumBuckets() {
        return this.numBuckets;
    }

    public void setNumBuckets(int i) {
        this.numBuckets = i;
    }
}
