/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.flux.client.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.flux.client.FluxClient;
import org.eclipse.flux.client.config.FluxConfig;
import org.eclipse.flux.client.config.RabbitMQFluxConfig;
import org.eclipse.flux.client.config.UserPermissions;
import org.eclipse.flux.client.impl.AbstractMessageConnector;
import org.eclipse.flux.client.impl.DeliveryTypes;
import org.eclipse.flux.client.util.Console;
import org.eclipse.flux.client.util.JSON;
import org.json.JSONObject;

public class RabbitMQMessageConnector
extends AbstractMessageConnector {
    private static final String EVERYONE = "$all$";
    private static Console console = Console.get(RabbitMQMessageConnector.class.getName());
    private RabbitMQFluxConfig conf;
    private ConnectionFactory factory;
    private UserPermissions permissions;
    private Connection connection;
    Channel channel;
    String inbox;
    String outbox;
    private DeliveryTypes deliveryTypes = DeliveryTypes.DEFAULTS;
    private Set<String> connectedChannels = new HashSet<String>();

    private ConnectionFactory connectionFactory() throws Exception {
        if (this.factory == null) {
            ConnectionFactory f = new ConnectionFactory();
            this.conf.applyTo(f);
            this.factory = f;
        }
        return this.factory;
    }

    public RabbitMQMessageConnector(FluxClient client, RabbitMQFluxConfig conf) throws Exception {
        super(client.getExecutor());
        this.conf = conf;
        this.connection = this.connectionFactory().newConnection();
        console.log("Connected to rabbitMQ: " + conf.getURI());
        this.permissions = conf.permissions();
        this.channel = this.connection.createChannel();
        this.inbox = this.createInbox();
        this.outbox = this.createOutbox();
        this.receiveBroadcasts();
    }

    private void receiveBroadcasts() throws IOException {
        this.channel.queueBind(this.inbox, this.outbox, EVERYONE);
        console.log("Connected to topic $all$");
    }

    private String createInbox() throws IOException {
        AMQP.Queue.DeclareOk ok = this.channel.queueDeclare("", false, false, true, null);
        final String inbox = ok.getQueue();
        console.log("Inbox created: " + inbox);
        this.channel.basicConsume(inbox, (Consumer)new DefaultConsumer(this.channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    JSONObject obj = JSON.parse(body);
                    if (!this.isSelfOriginated(obj)) {
                        RabbitMQMessageConnector.this.handleIncomingMessage(obj.getString("type"), obj.getJSONObject("data"));
                    }
                }
                catch (Exception e) {
                    console.log(e);
                }
            }

            private boolean isSelfOriginated(JSONObject obj) {
                try {
                    String origin = obj.getString("origin");
                    return inbox.equals(origin);
                }
                catch (Exception e) {
                    console.log(e);
                    return false;
                }
            }
        });
        return inbox;
    }

    private String createOutbox() throws IOException {
        String outbox = "flux";
        this.channel.exchangeDeclare(outbox, "topic");
        console.log("Outbox created");
        return outbox;
    }

    private static String channelNameToTopicPattern(String fluxChannelName) {
        RabbitMQMessageConnector.checkValidChannel(fluxChannelName);
        if (fluxChannelName.equals("$super$")) {
            return "*";
        }
        return fluxChannelName;
    }

    private static void checkValidChannel(String fluxChannelName) {
        int len = fluxChannelName.length();
        for (int i = 0; i < len; ++i) {
            char c = fluxChannelName.charAt(i);
            if (c != '.' && c != '#' && c != '*') continue;
            throw new IllegalArgumentException("Flux channel name '" + fluxChannelName + "' contains a special character '" + c + "'");
        }
    }

    public String usernameToRoutingKey(String name) {
        if (name.equals("*")) {
            return EVERYONE;
        }
        return name;
    }

    @Override
    public void connectToChannel(final String channel) {
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    RabbitMQMessageConnector.this.connectToChannelSync(channel);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connectToChannelSync(String channelName) throws Exception {
        boolean notifyNeeded = false;
        RabbitMQMessageConnector rabbitMQMessageConnector = this;
        synchronized (rabbitMQMessageConnector) {
            if (!this.isConnected(channelName)) {
                this.permissions.checkChannelJoin(channelName);
                String topic = RabbitMQMessageConnector.channelNameToTopicPattern(channelName);
                this.channel.queueBind(this.inbox, this.outbox, topic);
                this.connectedChannels.add(channelName);
                notifyNeeded = true;
            }
        }
        if (notifyNeeded) {
            this.notifyChannelConnected(channelName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnectFromChannelSync(String channelName) throws Exception {
        boolean notifyNeeded = false;
        RabbitMQMessageConnector rabbitMQMessageConnector = this;
        synchronized (rabbitMQMessageConnector) {
            if (this.isConnected(channelName)) {
                String topic = RabbitMQMessageConnector.channelNameToTopicPattern(channelName);
                this.channel.queueUnbind(this.inbox, this.outbox, topic);
                this.connectedChannels.remove(channelName);
                notifyNeeded = true;
            }
        }
        if (notifyNeeded) {
            this.notifyChannelDisconnected(channelName);
        }
    }

    @Override
    public void disconnectFromChannel(final String channel) {
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    RabbitMQMessageConnector.this.disconnectFromChannelSync(channel);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public synchronized boolean isConnected(String channel) {
        return this.connectedChannels.contains(channel);
    }

    @Override
    public void send(String messageType, JSONObject message) throws Exception {
        this.deliveryTypes.get(messageType).send(this, messageType, message);
    }

    @Override
    public synchronized void disconnect() {
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (IOException e) {
                console.log(e);
            }
            this.connection = null;
        }
    }

    @Override
    public synchronized boolean isConnected() {
        return this.connection != null && this.connection.isOpen();
    }

    @Override
    public FluxConfig getConfig() {
        return this.conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    byte[] encode(String messageType, JSONObject data) throws Exception {
        JSONObject message = new JSONObject().put("type", (Object)messageType).put("origin", (Object)this.inbox).put("data", (Object)data);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        try (OutputStreamWriter out = new OutputStreamWriter((OutputStream)bytes, "utf8");){
            message.write((Writer)out);
        }
        return bytes.toByteArray();
    }
}

