/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.gyrex.jobs.internal.worker;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.core.runtime.jobs.IJobChangeListener;
import org.eclipse.core.runtime.jobs.Job;
import org.eclipse.gyrex.cloud.services.queue.IMessage;
import org.eclipse.gyrex.cloud.services.queue.IQueue;
import org.eclipse.gyrex.cloud.services.queue.IQueueService;
import org.eclipse.gyrex.context.IRuntimeContext;
import org.eclipse.gyrex.context.registry.IRuntimeContextRegistry;
import org.eclipse.gyrex.jobs.IJobContext;
import org.eclipse.gyrex.jobs.internal.JobsActivator;
import org.eclipse.gyrex.jobs.internal.JobsDebug;
import org.eclipse.gyrex.jobs.internal.manager.JobManagerImpl;
import org.eclipse.gyrex.jobs.internal.worker.JobContext;
import org.eclipse.gyrex.jobs.internal.worker.JobInfo;
import org.eclipse.gyrex.jobs.internal.worker.JobStateSynchronizer;
import org.eclipse.gyrex.jobs.manager.IJobManager;
import org.eclipse.gyrex.jobs.provider.JobProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerEngine
extends Job {
    private static final long INITIAL_SLEEP_TIME = TimeUnit.SECONDS.toMillis(30L);
    private static final long MAX_SLEEP_TIME = TimeUnit.MINUTES.toMillis(5L);
    private static final Logger LOG = LoggerFactory.getLogger(WorkerEngine.class);
    private long engineSleepTime = INITIAL_SLEEP_TIME;

    public WorkerEngine() {
        super("Gyrex Worker Engine Job");
        this.setSystem(true);
        this.setPriority(30);
    }

    private JobContext createContext(JobInfo info) {
        IRuntimeContextRegistry contextRegistry = (IRuntimeContextRegistry)JobsActivator.getInstance().getService(IRuntimeContextRegistry.class);
        IRuntimeContext context = contextRegistry.get(info.getContextPath());
        if (context == null) {
            throw new IllegalStateException(String.format("Context %s not available!", info.getContextPath().toString()));
        }
        return new JobContext(context, info);
    }

    private Job createJob(JobInfo info, IJobContext jobContext) throws Exception {
        JobProvider provider = JobsActivator.getInstance().getJobProviderRegistry().getProvider(info.getJobTypeId());
        if (provider == null) {
            throw new IllegalStateException(String.format("Job type %s not available!", info.getJobTypeId()));
        }
        Job job = provider.createJob(info.getJobTypeId(), jobContext);
        if (job == null) {
            throw new IllegalStateException(String.format("Provider %s did not create job of type %s!", provider.toString(), info.getJobTypeId()));
        }
        return job;
    }

    private IStatus doRun(IProgressMonitor monitor) {
        try {
            Job job;
            JobInfo info;
            IQueue queue = this.getDefaultQueue();
            if (queue == null) {
                LOG.warn("No queue available for reading scheduled jobs to execute. Please check engine configuration.");
                return Status.CANCEL_STATUS;
            }
            HashMap<String, Long> requestProperties = new HashMap<String, Long>(1);
            requestProperties.put("queue.message.receive.timeout", this.getReceiveTimeout());
            List messages = queue.receiveMessages(1, requestProperties);
            if (messages.isEmpty()) {
                return Status.OK_STATUS;
            }
            IMessage message = (IMessage)messages.get(0);
            try {
                info = JobInfo.parse(message);
            }
            catch (IOException e) {
                LOG.warn("Invalid job info in message {}: {}", (Object)message, (Object)ExceptionUtils.getRootCauseMessage((Throwable)e));
                queue.deleteMessage(message);
                return Status.CANCEL_STATUS;
            }
            JobContext jobContext = this.createContext(info);
            try {
                job = this.createJob(info, jobContext);
            }
            catch (Exception e) {
                LOG.warn("Error creating job {}: {}", (Object)message, (Object)ExceptionUtils.getRootCauseMessage((Throwable)e));
                queue.deleteMessage(message);
                try {
                    JobManagerImpl jobManager = (JobManagerImpl)jobContext.getContext().get(IJobManager.class);
                    jobManager.setResult(info.getJobId(), (IStatus)new Status(4, "org.eclipse.gyrex.jobs", String.format("Error creating job: %s", e.getMessage()), (Throwable)e), System.currentTimeMillis());
                }
                catch (Exception jobManagerException) {
                    LOG.warn("Error updating job result for job {}: {}", (Object)info.getJobId(), (Object)ExceptionUtils.getRootCauseMessage((Throwable)jobManagerException));
                }
                return Status.CANCEL_STATUS;
            }
            job.addJobChangeListener((IJobChangeListener)new JobStateSynchronizer(job, jobContext));
            if (queue.deleteMessage(message)) {
                job.schedule();
            }
        }
        catch (IllegalStateException e) {
            LOG.warn("Unable to check queue for new jobs. System does not seem to be ready. {}", (Object)ExceptionUtils.getRootCauseMessage((Throwable)e));
            return Status.CANCEL_STATUS;
        }
        return Status.OK_STATUS;
    }

    protected IQueue getDefaultQueue() {
        IQueueService queueService = JobsActivator.getInstance().getQueueService();
        IQueue queue = queueService.getQueue("gyrex.jobs.queue.default", null);
        if (queue == null) {
            queueService.createQueue("gyrex.jobs.queue.default", null);
        }
        return queueService.getQueue("gyrex.jobs.queue.default", null);
    }

    protected long getReceiveTimeout() {
        return TimeUnit.MINUTES.toMillis(1L);
    }

    protected IStatus run(IProgressMonitor monitor) {
        try {
            IStatus status = this.doRun(monitor);
            this.engineSleepTime = !status.isOK() ? Math.min(this.engineSleepTime * 2L, MAX_SLEEP_TIME) : INITIAL_SLEEP_TIME;
            IStatus iStatus = status;
            return iStatus;
        }
        finally {
            if (!monitor.isCanceled()) {
                if (JobsDebug.workerEngine) {
                    LOG.debug("Rescheduling worker engine to run again in {} seconds", (Object)TimeUnit.MILLISECONDS.toSeconds(this.engineSleepTime));
                }
                this.schedule(this.engineSleepTime);
            }
        }
    }
}

