/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.scada.base.pipe.internal;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.scada.base.pipe.PipeService;
import org.eclipse.scada.base.pipe.Producer;
import org.eclipse.scada.base.pipe.Worker;
import org.eclipse.scada.base.pipe.WorkerAlreadyCreated;
import org.eclipse.scada.base.pipe.WorkerHandle;
import org.eclipse.scada.base.pipe.internal.WorkerThread;
import org.eclipse.scada.utils.io.RecursiveDeleteVisitor;
import org.eclipse.scada.utils.str.Tables;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeServiceImpl
implements PipeService {
    private static final Logger logger = LoggerFactory.getLogger(PipeServiceImpl.class);
    private final File storage;
    private final AtomicLong counter = new AtomicLong();
    private final Map<String, WorkerThread> workers = new HashMap<String, WorkerThread>();
    private boolean started;
    private WorkerHandle testWorker;
    private static final Pattern FILE_PATTERN = Pattern.compile("([0-9a-z]+)-([0-9a-z]+)-([0-9a-z]+)\\.evt", 2);

    public PipeServiceImpl() {
        this(PipeServiceImpl.getDefaultDirectory());
    }

    public PipeServiceImpl(File storage) {
        if (!storage.isDirectory()) {
            throw new IllegalStateException(String.format("'%s' is not a directory", storage));
        }
        this.storage = storage;
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
    }

    public synchronized void stop() {
        if (!this.started) {
            return;
        }
        this.started = false;
        for (WorkerThread thread : this.workers.values()) {
            thread.close();
        }
        this.workers.clear();
    }

    public static File getDefaultDirectory() {
        File fdir;
        String dir = System.getProperty("org.eclipse.scada.base.pipe.storage", null);
        if (dir == null) {
            File base = FrameworkUtil.getBundle(PipeServiceImpl.class).getDataFile(null);
            if (base == null) {
                throw new IllegalStateException("Unable to get root folder of bundle data directory");
            }
            fdir = new File(base, "storage");
        } else {
            fdir = new File(dir);
        }
        if (!fdir.exists()) {
            fdir.mkdirs();
        }
        if (!fdir.isDirectory()) {
            throw new IllegalStateException(String.format("'%s' is not a valid directory or could not be created", fdir));
        }
        return fdir;
    }

    @Override
    public Producer createProducer(final String pipeName) {
        return new Producer(){

            @Override
            public void publish(byte[] data, int retries) throws IOException {
                PipeServiceImpl.this.publishEvent(pipeName, data, retries);
            }
        };
    }

    protected void publishEvent(String pipeName, byte[] data, int retries) throws IOException {
        String name = this.encode(pipeName);
        MetaInfo info = new MetaInfo(0L, retries);
        File file = this.makeFile(name, info);
        logger.trace("Block file: {}", (Object)file);
        if (!file.createNewFile()) {
            throw new IllegalStateException(String.format("File '%s' already exists", file));
        }
        File partFile = new File(String.valueOf(file.getAbsolutePath()) + ".part");
        logger.trace("Part file: {}", (Object)partFile);
        try {
            Files.write(partFile.toPath(), data, StandardOpenOption.CREATE_NEW);
            Files.move(partFile.toPath(), file.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
            logger.trace("Created data file: {} bytes", (Object)data.length);
        }
        finally {
            if (partFile.exists()) {
                logger.trace("Delete part file: {}", (Object)partFile);
                partFile.delete();
            }
        }
        this.notifyConsumer(pipeName);
    }

    private synchronized void notifyConsumer(String pipeName) {
        WorkerThread thread = this.workers.get(pipeName);
        if (thread != null) {
            logger.debug("Notify consumer thread: {} -> {}", (Object)pipeName, (Object)thread);
            thread.notifyNewEvent();
        }
    }

    File makeFile(String pipeName, MetaInfo info) {
        long retry = info == null ? 0L : info.retry;
        long time = info == null || info.timestamp <= 0L ? System.currentTimeMillis() : info.timestamp;
        long value = this.counter.incrementAndGet();
        File queueDir = this.getQueueDir(pipeName);
        return new File(queueDir, String.format("%016x-%016x-%08x.evt", time, value, retry));
    }

    private File getQueueDir(String pipeName) {
        File queueDir = new File(this.storage, pipeName);
        if (!queueDir.exists()) {
            queueDir.mkdir();
        }
        if (!queueDir.isDirectory()) {
            throw new IllegalStateException(String.format("Unable to create queue dir: '%s'", queueDir));
        }
        return queueDir;
    }

    private String encode(String name) {
        try {
            return URLEncoder.encode(name, "UTF-8");
        }
        catch (UnsupportedEncodingException unsupportedEncodingException) {
            return name;
        }
    }

    @Override
    public synchronized WorkerHandle createWorker(final String pipeName, Worker worker) throws WorkerAlreadyCreated {
        if (this.workers.containsKey(pipeName)) {
            throw new WorkerAlreadyCreated(String.format("The consumer '%s' has already been created", pipeName));
        }
        WorkerHandle handle = new WorkerHandle(){

            @Override
            public void close() {
                PipeServiceImpl.this.performClose(pipeName);
            }
        };
        WorkerThread thread = new WorkerThread(this, worker, pipeName);
        thread.setName("PipeWorkerThread/" + pipeName);
        this.workers.put(pipeName, thread);
        thread.start();
        return handle;
    }

    /*
     * Unable to fully structure code
     */
    public Long fetchNextEvents(String pipeName, int max, List<File> result) {
        dir = this.getQueueDir(pipeName);
        files = dir.listFiles();
        Arrays.sort(files);
        now = System.currentTimeMillis();
        endTime = null;
        var12_8 = files;
        var11_9 = files.length;
        var10_10 = 0;
        while (var10_10 < var11_9) {
            block8: {
                file = var12_8[var10_10];
                if (!file.isFile() || !file.canRead()) break block8;
                if (!file.getName().endsWith(".evt") || file.length() <= 0L) ** GOTO lbl28
                PipeServiceImpl.logger.trace("Preparing: {}", file);
                info = PipeServiceImpl.parse(file.getName());
                if (info == null) {
                    PipeServiceImpl.logger.info("Broken file name: {}", (Object)file.getName());
                } else {
                    timestamp = info.timestamp;
                    if (timestamp > now) {
                        if (endTime == null || endTime > timestamp) {
                            if (PipeServiceImpl.logger.isTraceEnabled()) {
                                PipeServiceImpl.logger.trace("Setting end time to {} (in {} ms)", (Object)timestamp, (Object)(timestamp - now));
                            }
                            endTime = timestamp;
                        }
                        PipeServiceImpl.logger.debug("Postponed item: {}", (Object)file.getName());
                    } else {
                        result.add((File)file);
lbl28:
                        // 2 sources

                        if (max > 0 && result.size() > max) {
                            return endTime;
                        }
                    }
                }
            }
            ++var10_10;
        }
        return endTime;
    }

    static MetaInfo parse(String n) {
        Matcher m = FILE_PATTERN.matcher(n);
        if (!m.matches()) {
            return null;
        }
        long timestamp = Long.parseLong(m.group(1), 16);
        long retry = Long.parseLong(m.group(3), 16);
        return new MetaInfo(timestamp, retry);
    }

    public void processNextEvent(Worker worker, String pipeName) throws IOException {
        File dir = this.getQueueDir(pipeName);
        Object[] files = dir.listFiles();
        Arrays.sort(files);
        Object[] objectArray = files;
        int n = files.length;
        int n2 = 0;
        while (n2 < n) {
            Object file = objectArray[n2];
            if (((File)file).isFile() && ((File)file).canRead() && ((File)file).getName().endsWith(".evt") && ((File)file).length() > 0L) {
                logger.trace("Processing: {}", file);
                byte[] data = Files.readAllBytes(((File)file).toPath());
                try {
                    worker.work(data);
                }
                catch (Exception e) {
                    logger.info("Worker failed", (Throwable)e);
                }
                Files.delete(((File)file).toPath());
            }
            ++n2;
        }
    }

    protected synchronized void performClose(String pipeName) {
        WorkerThread thread = this.workers.remove(pipeName);
        if (thread != null) {
            thread.close();
        }
    }

    public void pipes() {
        File[] fileArray = this.storage.listFiles();
        int n = fileArray.length;
        int n2 = 0;
        while (n2 < n) {
            File file = fileArray[n2];
            if (file.isDirectory()) {
                String pipeName = this.decode(file.getName());
                System.out.println(pipeName);
            }
            ++n2;
        }
    }

    public void list(String pipeName) {
        File dir = this.getQueueDir(pipeName);
        LinkedList rows = new LinkedList();
        Object[] files = dir.listFiles();
        Arrays.sort(files);
        Object[] objectArray = files;
        int n = files.length;
        int n2 = 0;
        while (n2 < n) {
            MetaInfo info;
            Object f = objectArray[n2];
            if (((File)f).isFile() && ((File)f).length() > 0L && (info = PipeServiceImpl.parse(((File)f).getName())) != null) {
                LinkedList<String> row = new LinkedList<String>();
                row.add(((File)f).getName());
                row.add("" + ((File)f).length());
                row.add(String.format("%tc", new Date(((File)f).lastModified())));
                row.add(String.format("%tc", new Date(info.timestamp)));
                row.add("" + info.retry);
                rows.add(row);
            }
            ++n2;
        }
        Tables.showTable((PrintStream)System.out, Arrays.asList("Name", "Size", "Entry Date", "Schedule", "Retry"), rows, (int)3);
    }

    private String decode(String name) {
        try {
            return URLDecoder.decode(name, "UTF-8");
        }
        catch (UnsupportedEncodingException unsupportedEncodingException) {
            return name;
        }
    }

    public void startTestWorker(String pipeName) throws WorkerAlreadyCreated {
        final PrintStream os = System.out;
        this.testWorker = this.createWorker(pipeName, new Worker(){

            @Override
            public void work(byte[] data) {
                os.format("Pipe event - size: %s, data: %s%n", data.length, data);
                os.flush();
                String str = StandardCharsets.UTF_8.decode(ByteBuffer.wrap(data)).toString();
                os.format("\tString: %s%n", str);
                os.flush();
                String[] toks = str.split(" ");
                if (toks.length > 1 && toks[0].equals("sleep")) {
                    try {
                        os.println("Sleeping...");
                        os.flush();
                        Thread.sleep(Integer.parseInt(toks[1]) * 1000);
                        os.println("Sleeping... done!");
                        os.flush();
                    }
                    catch (InterruptedException | NumberFormatException e) {
                        e.printStackTrace(os);
                    }
                } else if (toks.length > 0 && toks[0].equals("error")) {
                    throw new RuntimeException("Test Exception");
                }
            }
        });
    }

    public void closeTestWorker() {
        if (this.testWorker != null) {
            this.testWorker.close();
        }
    }

    public void testPublish(String pipeName, String data) throws IOException {
        this.publishEvent(pipeName, StandardCharsets.UTF_8.encode(data).array(), 2);
    }

    public void drop(String pipeName) throws IOException {
        File ndir;
        File dir = this.getQueueDir(pipeName);
        int i = 0;
        do {
            ndir = new File(String.valueOf(dir.getName()) + "-" + i);
            ++i;
        } while (ndir.exists());
        Files.move(dir.toPath(), ndir.toPath(), StandardCopyOption.ATOMIC_MOVE);
        Files.walkFileTree(ndir.toPath(), (FileVisitor<? super Path>)new RecursiveDeleteVisitor());
    }

    static class MetaInfo {
        long timestamp;
        long retry;

        public MetaInfo(long timestamp, long retry) {
            this.timestamp = timestamp;
            this.retry = retry;
        }
    }
}

