/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.stream.flume;

import java.util.ArrayList;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.stream.flume.EventTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IgniteSink
extends AbstractSink
implements Configurable {
    private static final Logger log = LoggerFactory.getLogger(IgniteSink.class);
    private static final int DFLT_BATCH_SIZE = 100;
    private String springCfgPath;
    private String cacheName;
    private String eventTransformerCls;
    private int batchSize;
    private SinkCounter sinkCounter;
    private EventTransformer<Event, Object, Object> eventTransformer;
    private Ignite ignite;

    public void configure(Context context) {
        this.springCfgPath = context.getString("igniteCfg");
        this.cacheName = context.getString("cacheName");
        this.eventTransformerCls = context.getString("eventTransformer");
        this.batchSize = context.getInteger("batchSize", Integer.valueOf(100));
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(this.getName());
        }
    }

    public synchronized void start() {
        A.notNull((Object)this.springCfgPath, (String)"Ignite config file");
        A.notNull((Object)this.cacheName, (String)"Cache name");
        A.notNull((Object)this.eventTransformerCls, (String)"Event transformer class");
        this.sinkCounter.start();
        try {
            if (this.ignite == null) {
                this.ignite = Ignition.start((String)this.springCfgPath);
            }
            if (this.eventTransformerCls != null && !this.eventTransformerCls.isEmpty()) {
                Class<?> clazz = Class.forName(this.eventTransformerCls);
                this.eventTransformer = (EventTransformer)clazz.newInstance();
            }
        }
        catch (Exception e) {
            log.error("Failed to start grid", (Throwable)e);
            this.sinkCounter.incrementConnectionFailedCount();
            throw new FlumeException("Failed to start grid", (Throwable)e);
        }
        this.sinkCounter.incrementConnectionCreatedCount();
        super.start();
    }

    public synchronized void stop() {
        if (this.ignite != null) {
            this.ignite.close();
        }
        this.sinkCounter.incrementConnectionClosedCount();
        this.sinkCounter.stop();
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        int eventCount;
        Channel channel = this.getChannel();
        try (Transaction transaction = channel.getTransaction();){
            Event event;
            transaction.begin();
            ArrayList<Event> batch = new ArrayList<Event>(this.batchSize);
            for (eventCount = 0; eventCount < this.batchSize && (event = channel.take()) != null; ++eventCount) {
                batch.add(event);
            }
            if (!batch.isEmpty()) {
                this.ignite.cache(this.cacheName).putAll(this.eventTransformer.transform(batch));
                if (batch.size() < this.batchSize) {
                    this.sinkCounter.incrementBatchUnderflowCount();
                } else {
                    this.sinkCounter.incrementBatchCompleteCount();
                }
            } else {
                this.sinkCounter.incrementBatchEmptyCount();
            }
            this.sinkCounter.addToEventDrainAttemptCount((long)batch.size());
            transaction.commit();
            this.sinkCounter.addToEventDrainSuccessCount((long)batch.size());
        }
        return eventCount == 0 ? Sink.Status.BACKOFF : Sink.Status.READY;
    }
}

