/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.gyrex.cloud.internal.queue;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.eclipse.core.runtime.IPath;
import org.eclipse.gyrex.cloud.internal.queue.Message;
import org.eclipse.gyrex.cloud.internal.queue.QueueOperationFailedException;
import org.eclipse.gyrex.cloud.internal.zk.IZooKeeperLayout;
import org.eclipse.gyrex.cloud.internal.zk.ZooKeeperGate;
import org.eclipse.gyrex.cloud.internal.zk.ZooKeeperMonitor;
import org.eclipse.gyrex.cloud.services.queue.IMessage;
import org.eclipse.gyrex.cloud.services.queue.IQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperQueue
implements IQueue {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperQueue.class);
    private static final String PREFIX = "msg-";
    final String id;
    final IPath queuePath;

    public ZooKeeperQueue(String id) {
        this.id = id;
        this.queuePath = IZooKeeperLayout.PATH_QUEUES_ROOT.append(id);
    }

    @Override
    public IMessage consumeMessage(long timeout, TimeUnit unit) throws IllegalArgumentException, IllegalStateException, SecurityException, InterruptedException {
        if (timeout > 0L && unit == null) {
            throw new IllegalArgumentException("unit must not be null when timeout is specified");
        }
        long abortTime = timeout > 0L ? unit.toMillis(timeout) + System.currentTimeMillis() : 0L;
        while (true) {
            TreeMap<Long, String> queueChildren;
            try {
                queueChildren = this.readQueueChildren(null);
            }
            catch (Exception e) {
                if (e instanceof KeeperException.NoNodeException) {
                    throw new IllegalStateException(String.format("queue '%s' does not exist", this.id));
                }
                throw new QueueOperationFailedException(this.id, "CONSUME_MESSAGES", e);
            }
            if (queueChildren.size() > 0) {
                for (String childName : queueChildren.values()) {
                    Message message;
                    if (childName == null || (message = this.readQueueMessage(childName)) == null || message.isHidden() || !message.consume(false)) continue;
                    return message;
                }
            }
            if (abortTime <= 0L) {
                return null;
            }
            long diff = abortTime - System.currentTimeMillis();
            if (diff <= 0L) break;
            Thread.sleep(Math.max(diff / 2L, 250L));
        }
        return null;
    }

    @Override
    public boolean deleteMessage(IMessage message) throws IllegalArgumentException, IllegalStateException, SecurityException, NoSuchElementException {
        if (!(message instanceof Message) || !StringUtils.equals((String)this.id, (String)message.getQueueId())) {
            throw new IllegalArgumentException(String.format("Message '%s' was not received from this queue.", String.valueOf(message)));
        }
        return ((Message)message).delete(true);
    }

    private long getReceiveMessageTimeout(Map<String, ?> properties) {
        Object timeout;
        if (properties != null && (timeout = properties.get("queue.message.receive.timeout")) != null) {
            if (!Long.class.isAssignableFrom(timeout.getClass())) {
                throw new IllegalArgumentException(String.format("Property %s must be of type Long or long.", "queue.message.receive.timeout"));
            }
            return (Long)timeout;
        }
        Properties queueData = this.readQueueData();
        String queueTimeout = queueData.getProperty("queue.message.receive.timeout", null);
        return NumberUtils.toLong((String)queueTimeout, (long)30000L);
    }

    private TreeMap<Long, String> readQueueChildren(ZooKeeperMonitor monitor) throws InterruptedException, IllegalStateException, KeeperException {
        TreeMap<Long, String> childrenBySequenceNumber = new TreeMap<Long, String>();
        Collection<String> childNames = ZooKeeperGate.get().readChildrenNames(this.queuePath, monitor, null);
        for (String childName : childNames) {
            if (!StringUtils.startsWith((String)childName, (String)PREFIX)) {
                LOG.warn("Incorrect child name {} in queue {}.", new Object[]{childName, this.id});
                continue;
            }
            long sequenceNumber = NumberUtils.toLong((String)StringUtils.substring((String)childName, (int)PREFIX.length()), (long)-1L);
            if (sequenceNumber < 0L) {
                LOG.warn("Incorrect sequence number in child name {} in queue {}.", new Object[]{childName, this.id});
                continue;
            }
            childrenBySequenceNumber.put(sequenceNumber, childName);
        }
        return childrenBySequenceNumber;
    }

    private Properties readQueueData() {
        Properties queueData = new Properties();
        try {
            Stat stat = new Stat();
            byte[] record = ZooKeeperGate.get().readRecord(this.queuePath, stat);
            if (record == null) {
                return queueData;
            }
            queueData.load(new ByteArrayInputStream(record));
            return queueData;
        }
        catch (Exception e) {
            if (e instanceof KeeperException.NoNodeException) {
                throw new IllegalStateException(String.format("Queue '%s' has been removed!", this.id));
            }
            throw new QueueOperationFailedException(this.id, "READ_QUEUE_DATA", e);
        }
    }

    private Message readQueueMessage(String messageId) {
        byte[] record;
        Stat stat;
        block4: {
            stat = new Stat();
            record = ZooKeeperGate.get().readRecord(this.queuePath.append(messageId), stat);
            if (record != null) break block4;
            return null;
        }
        try {
            return new Message(messageId, this, record, stat);
        }
        catch (KeeperException.NoNodeException noNodeException) {
            return null;
        }
        catch (Exception e) {
            throw new QueueOperationFailedException(this.id, String.format("MESSAGE_READ(%s)", messageId), e);
        }
    }

    @Override
    public List<IMessage> receiveMessages(int maxNumberOfMessages, Map<String, ?> properties) throws IllegalArgumentException, IllegalStateException, SecurityException {
        if (maxNumberOfMessages < 0) {
            throw new IllegalArgumentException("maxNumberOfMessages must be greate than zero");
        }
        ArrayList<IMessage> messages = new ArrayList<IMessage>(maxNumberOfMessages);
        try {
            long receiveMessageTimeout = this.getReceiveMessageTimeout(properties);
            TreeMap<Long, String> queueChildren = this.readQueueChildren(null);
            for (String childName : queueChildren.values()) {
                Message message;
                if (childName == null || (message = this.readQueueMessage(childName)) == null || message.isHidden() || !message.receive(receiveMessageTimeout, false)) continue;
                messages.add(message);
            }
        }
        catch (Exception e) {
            if (e instanceof KeeperException.NoNodeException) {
                throw new IllegalStateException(String.format("queue '%s' does not exist", this.id));
            }
            throw new QueueOperationFailedException(this.id, "RECEIVE_MESSAGES", e);
        }
        return messages;
    }

    @Override
    public void sendMessage(byte[] messageBody) throws IllegalArgumentException, IllegalStateException, SecurityException {
        try {
            ZooKeeperGate.get().createPath(this.queuePath.append(PREFIX), CreateMode.PERSISTENT_SEQUENTIAL, new Message(this.id, messageBody).toByteArray());
        }
        catch (Exception e) {
            if (e instanceof KeeperException.NoNodeException) {
                throw new IllegalStateException(String.format("queue '%s' does not exist", this.id));
            }
            throw new QueueOperationFailedException(this.id, "SEND_MESSAGES", e);
        }
    }
}

