/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.falcon.hook;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FalconHook
extends AtlasHook
implements FalconEventPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class);
    public static final String CONF_PREFIX = "atlas.hook.falcon.";
    private static final String MIN_THREADS = "atlas.hook.falcon.minThreads";
    private static final String MAX_THREADS = "atlas.hook.falcon.maxThreads";
    private static final String KEEP_ALIVE_TIME = "atlas.hook.falcon.keepAliveTime";
    public static final String QUEUE_SIZE = "atlas.hook.falcon.queueSize";
    public static final String CONF_SYNC = "atlas.hook.falcon.synchronous";
    public static final String HOOK_NUM_RETRIES = "atlas.hook.falcon.numRetries";
    private static final int WAIT_TIME = 3;
    private static ExecutorService executor;
    private static final int minThreadsDefault = 5;
    private static final int maxThreadsDefault = 5;
    private static final long keepAliveTimeDefault = 10L;
    private static final int queueSizeDefault = 10000;
    private static boolean sync;
    private static ConfigurationStore STORE;

    @Override
    public void publish(FalconEventPublisher.Data data) {
        final FalconEvent event = data.getEvent();
        try {
            if (sync) {
                this.fireAndForget(event);
            } else {
                executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            FalconHook.this.fireAndForget(event);
                        }
                        catch (Throwable e) {
                            LOG.info("Atlas hook failed", e);
                        }
                    }
                });
            }
        }
        catch (Throwable t) {
            LOG.warn("Error in processing data {}", (Object)data, (Object)t);
        }
    }

    protected String getNumberOfRetriesPropertyKey() {
        return HOOK_NUM_RETRIES;
    }

    private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException {
        LOG.info("Entered Atlas hook for Falcon hook operation {}", (Object)event.getOperation());
        ArrayList<HookNotification.EntityCreateRequest> messages = new ArrayList<HookNotification.EntityCreateRequest>();
        Operation op = FalconHook.getOperation(event.getOperation());
        String user = FalconHook.getUser((String)event.getUser());
        LOG.info("fireAndForget user:{}", (Object)user);
        switch (op) {
            case ADD: {
                messages.add(new HookNotification.EntityCreateRequest(user, this.createEntities(event, user)));
            }
        }
        this.notifyEntities(messages);
    }

    private List<Referenceable> createEntities(FalconEvent event, String user) throws FalconException, URISyntaxException {
        ArrayList<Referenceable> entities = new ArrayList<Referenceable>();
        switch (event.getOperation()) {
            case ADD_CLUSTER: {
                entities.add(FalconBridge.createClusterEntity((Cluster)event.getEntity()));
                break;
            }
            case ADD_PROCESS: {
                entities.addAll(FalconBridge.createProcessEntity((Process)event.getEntity(), STORE));
                break;
            }
            case ADD_FEED: {
                entities.addAll(FalconBridge.createFeedCreationEntity((Feed)event.getEntity(), STORE));
                break;
            }
            default: {
                LOG.info("Falcon operation {} is not valid or supported", (Object)event.getOperation());
            }
        }
        return entities;
    }

    private static Operation getOperation(FalconEvent.OPERATION op) throws FalconException {
        switch (op) {
            case ADD_CLUSTER: 
            case ADD_PROCESS: 
            case ADD_FEED: {
                return Operation.ADD;
            }
            case UPDATE_CLUSTER: 
            case UPDATE_FEED: 
            case UPDATE_PROCESS: {
                return Operation.UPDATE;
            }
        }
        throw new FalconException("Falcon operation " + (Object)((Object)op) + " is not valid or supported");
    }

    static {
        try {
            int minThreads = atlasProperties.getInt(MIN_THREADS, 5);
            int maxThreads = atlasProperties.getInt(MAX_THREADS, 5);
            long keepAliveTime = atlasProperties.getLong(KEEP_ALIVE_TIME, 10L);
            int queueSize = atlasProperties.getInt(QUEUE_SIZE, 10000);
            sync = atlasProperties.getBoolean(CONF_SYNC, false);
            executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize), new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
            ShutdownHookManager.get().addShutdownHook((Runnable)new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info("==> Shutdown of Atlas Falcon Hook");
                        executor.shutdown();
                        executor.awaitTermination(3L, TimeUnit.SECONDS);
                        executor = null;
                    }
                    catch (InterruptedException ie) {
                        LOG.info("Interrupt received in shutdown.");
                    }
                    finally {
                        LOG.info("<== Shutdown of Atlas Falcon Hook");
                    }
                }
            }, 30);
            STORE = ConfigurationStore.get();
            notificationInterface = NotificationProvider.get();
        }
        catch (Exception e) {
            LOG.error("Caught exception initializing the falcon hook.", (Throwable)e);
        }
        LOG.info("Created Atlas Hook for Falcon");
    }

    private static enum Operation {
        ADD,
        UPDATE;

    }
}

