/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.gyrex.eventbus.websocket.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.gyrex.cloud.admin.ICloudManager;
import org.eclipse.gyrex.cloud.admin.INodeDescriptor;
import org.eclipse.gyrex.cloud.admin.INodeListener;
import org.eclipse.gyrex.cloud.services.events.EventMessage;
import org.eclipse.gyrex.cloud.services.events.IEventReceiver;
import org.eclipse.gyrex.cloud.services.events.IEventTransport;
import org.eclipse.gyrex.eventbus.websocket.internal.EventMessageReceiver;
import org.eclipse.gyrex.eventbus.websocket.internal.EventMessageSender;
import org.eclipse.gyrex.server.Platform;
import org.eclipse.gyrex.server.settings.SystemSetting;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebsocketEventTransport
implements IEventTransport {
    private static final Logger LOG = LoggerFactory.getLogger(WebsocketEventTransport.class);
    private static final SystemSetting<Integer> eventsPort = SystemSetting.newIntegerSetting((String)"gyrex.event.websocket.port", (String)"Default port for web socket based event transport.").usingDefault((Object)Platform.getInstancePort((int)3111)).create();
    private static final AtomicReference<Server> serverRef = new AtomicReference();
    private static final AtomicReference<WebSocketClient> clientRef = new AtomicReference();
    private final ConcurrentMap<String, CopyOnWriteArrayList<IEventReceiver>> eventReceiverListsByTopicId;
    private volatile ICloudManager cloudManager;
    private volatile Map<String, EventMessageSender> connectedNodesByNodeId = Collections.emptyMap();
    private final INodeListener reconnectListener = new INodeListener(){

        public void nodesChanged() {
            Job connectionMonitor = WebsocketEventTransport.this.connectionMonitor;
            if (connectionMonitor != null) {
                connectionMonitor.schedule();
            }
        }
    };
    volatile Job connectionMonitor;

    public WebsocketEventTransport() {
        this.eventReceiverListsByTopicId = new ConcurrentHashMap<String, CopyOnWriteArrayList<IEventReceiver>>();
    }

    public void activate(ComponentContext context) {
        LOG.info("Activating WebsocketEventTransport.");
        this.startWebSocketServer();
        try {
            this.startWebSocketClient();
            try {
                this.getCloudManager().addNodeListener(this.reconnectListener);
                this.connectionMonitor = new Job("Event Bus Connection Monitor"){

                    protected IStatus run(IProgressMonitor monitor) {
                        try {
                            LOG.debug("Connecting online nodes for the event transport.");
                            WebsocketEventTransport.this.connectAllOnlineNodes();
                        }
                        catch (Exception e) {
                            LOG.warn("An error occured while connecting online nodes for the event transport. Operation will be retried.");
                        }
                        if (!monitor.isCanceled()) {
                            this.schedule(120000L);
                        }
                        return Status.OK_STATUS;
                    }
                };
                this.connectionMonitor.setSystem(true);
                this.connectionMonitor.schedule();
            }
            catch (LinkageError | RuntimeException e) {
                this.stopWebSocketClient();
                throw e;
            }
        }
        catch (LinkageError | RuntimeException e) {
            this.stopWebSocketServer();
            throw e;
        }
    }

    private void connectAllOnlineNodes() {
        HashMap<String, EventMessageSender> existingConnections = new HashMap<String, EventMessageSender>(this.connectedNodesByNodeId);
        HashMap<String, EventMessageSender> newConnections = new HashMap<String, EventMessageSender>();
        ImmutableMap approvedNodesById = Maps.uniqueIndex((Iterable)this.getCloudManager().getApprovedNodes(), (Function)new Function<INodeDescriptor, String>(){

            public String apply(INodeDescriptor input) {
                return input.getId();
            }
        });
        WebSocketClient client = clientRef.get();
        if (client == null) {
            return;
        }
        for (String nodeId : this.getCloudManager().getOnlineNodes()) {
            INodeDescriptor descriptor;
            if (nodeId.equals(this.getCloudManager().getLocalInfo().getNodeId()) || (descriptor = (INodeDescriptor)approvedNodesById.get((Object)nodeId)) == null) continue;
            EventMessageSender sender = (EventMessageSender)((Object)existingConnections.remove(nodeId));
            if (sender != null && sender.isConnected()) {
                newConnections.put(nodeId, sender);
                continue;
            }
            sender = new EventMessageSender(this.getCloudManager().getLocalInfo().getNodeId());
            if (!this.connectToNextAvailableAddress(new ArrayList(descriptor.getAddresses()).iterator(), sender, client, nodeId)) continue;
            newConnections.put(nodeId, sender);
        }
        this.connectedNodesByNodeId = newConnections;
        for (EventMessageSender sender : existingConnections.values()) {
            if (!sender.isConnected()) continue;
            Session session = sender.getSession();
            session.close();
        }
    }

    private boolean connectToNextAvailableAddress(Iterator<String> addresses, EventMessageSender sender, WebSocketClient client, String nodeId) {
        if (!addresses.hasNext()) {
            LOG.error("No routes available to node ({}). Unable to connect event transport. Please check node addresses configuration.", (Object)nodeId);
            return false;
        }
        String address = addresses.next();
        addresses.remove();
        try {
            URI echoUri = new URI("ws://" + address + ":" + eventsPort.get() + "/eventbus/");
            client.connect((Object)sender, echoUri).get();
            return true;
        }
        catch (CancellationException e) {
            LOG.debug("Aborted while connecting to node ({}) at ({}).", (Object)nodeId, (Object)address);
            return false;
        }
        catch (IOException | URISyntaxException | ExecutionException e) {
            LOG.warn("Unable to connect to ({}). {}", new Object[]{address, e.getMessage(), e});
            return this.connectToNextAvailableAddress(addresses, sender, client, nodeId);
        }
        catch (InterruptedException e) {
            LOG.debug("Interrupted while connecting to node ({}) at ({}).", (Object)nodeId, (Object)address);
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public void deactivate(ComponentContext context) {
        LOG.info("Deactivating WebsocketEventTransport.");
        this.getCloudManager().removeNodeListener(this.reconnectListener);
        Job connectionMonitor = this.connectionMonitor;
        if (connectionMonitor != null) {
            connectionMonitor.cancel();
            connectionMonitor = null;
        }
        Map<String, EventMessageSender> nodesByNodeId = this.connectedNodesByNodeId;
        this.connectedNodesByNodeId = Collections.emptyMap();
        for (EventMessageSender sender : nodesByNodeId.values()) {
            sender.getSession().close();
        }
        this.stopWebSocketServer();
        this.stopWebSocketClient();
    }

    @VisibleForTesting
    void distributeEventToSubscribers(String topicId, EventMessage message) {
        CopyOnWriteArrayList receivers = (CopyOnWriteArrayList)this.eventReceiverListsByTopicId.get(topicId);
        if (receivers != null) {
            for (IEventReceiver eventReceiver : receivers) {
                eventReceiver.receiveEvent(message);
            }
        }
    }

    public ICloudManager getCloudManager() {
        ICloudManager manager = this.cloudManager;
        Preconditions.checkState((manager != null ? 1 : 0) != 0, (Object)"inactive");
        return manager;
    }

    public void sendEvent(String topicId, EventMessage eventMessage, Map<String, ?> properties) {
        this.distributeEventToSubscribers(topicId, eventMessage);
        Map<String, EventMessageSender> connectedNodesByNodeId = this.connectedNodesByNodeId;
        for (Map.Entry<String, EventMessageSender> e : connectedNodesByNodeId.entrySet()) {
            LOG.trace("Sending event ({}) to ({})", (Object)eventMessage.getId(), (Object)e.getKey());
            if (e.getValue().isConnected()) {
                e.getValue().sendEvent(topicId, eventMessage);
                continue;
            }
            LOG.warn("Dead connection to node ({}).", (Object)e.getKey());
        }
    }

    public void setCloudManager(ICloudManager cloudManager) {
        this.cloudManager = cloudManager;
    }

    private void startWebSocketClient() {
        try {
            WebSocketClient client = new WebSocketClient();
            Preconditions.checkState((boolean)clientRef.compareAndSet(null, client), (Object)"Only one active transport allowed!");
            client.setAsyncWriteTimeout(5000L);
            client.setConnectTimeout(5000L);
            client.start();
        }
        catch (Exception e) {
            throw new IllegalStateException("Error starting web socket client for cluster event websocket", e);
        }
    }

    private void startWebSocketServer() {
        try {
            Server server = new Server();
            Preconditions.checkState((boolean)serverRef.compareAndSet(null, server), (Object)"Only one active transport allowed!");
            HttpConfiguration httpConfiguration = new HttpConfiguration();
            httpConfiguration.setSendServerVersion(false);
            httpConfiguration.setSendDateHeader(false);
            ServerConnector connector = new ServerConnector(server, new ConnectionFactory[]{new HttpConnectionFactory(httpConfiguration)});
            connector.setPort(((Integer)eventsPort.get()).intValue());
            connector.setIdleTimeout(60000L);
            server.addConnector((Connector)connector);
            server.setStopAtShutdown(true);
            server.setStopTimeout(5000L);
            HandlerCollection handlers = new HandlerCollection();
            final EventMessageReceiver singletonReceiver = new EventMessageReceiver(this.getCloudManager().getLocalInfo().getNodeId(), new EventMessageReceiver.IEventMessageCallback(){

                @Override
                public void onEventMessage(String topicId, EventMessage message) {
                    WebsocketEventTransport.this.distributeEventToSubscribers(topicId, message);
                }
            });
            WebSocketHandler wsHandler = new WebSocketHandler(){

                public void configure(WebSocketServletFactory factory) {
                    factory.setCreator(new WebSocketCreator(){

                        public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
                            return singletonReceiver;
                        }
                    });
                }
            };
            ContextHandler context = new ContextHandler();
            context.setContextPath("/eventbus");
            context.setHandler((Handler)wsHandler);
            handlers.addHandler((Handler)context);
            server.setHandler((Handler)handlers);
            server.start();
        }
        catch (Exception e) {
            throw new IllegalStateException("Error starting jetty for cluster event websocket", e);
        }
    }

    private void stopWebSocketClient() {
        try {
            WebSocketClient client = clientRef.getAndSet(null);
            if (client != null) {
                client.stop();
            }
        }
        catch (Exception e) {
            LOG.error("Error stopping websocket client.", (Throwable)e);
        }
    }

    private void stopWebSocketServer() {
        try {
            Server server = serverRef.getAndSet(null);
            if (server != null) {
                server.stop();
            }
        }
        catch (Exception e) {
            LOG.error("Error stopping websocket server.", (Throwable)e);
        }
    }

    public void subscribeTopic(String topicId, IEventReceiver receiver, Map<String, ?> properties) {
        CopyOnWriteArrayList<IEventReceiver> eventReceiverList = (CopyOnWriteArrayList<IEventReceiver>)this.eventReceiverListsByTopicId.get(topicId);
        while (eventReceiverList == null) {
            eventReceiverList = this.eventReceiverListsByTopicId.putIfAbsent(topicId, new CopyOnWriteArrayList());
        }
        eventReceiverList.add(receiver);
    }

    public void unsubscribeTopic(String topicId, IEventReceiver receiver, Map<String, ?> properties) {
        CopyOnWriteArrayList eventReceiverList = (CopyOnWriteArrayList)this.eventReceiverListsByTopicId.get(topicId);
        if (eventReceiverList != null) {
            eventReceiverList.remove(receiver);
        }
    }
}

