EventProcessorExecutor.java

/*
 * Copyright (c) 2021 Mārtiņš Avots (Martins Avots) and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the MIT License,
 * which is available at https://spdx.org/licenses/MIT.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR MIT
 */
package net.splitcells.dem.execution;

import net.splitcells.dem.resource.communication.Closeable;
import net.splitcells.dem.resource.communication.Flushable;

import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Deprecated
public class EventProcessorExecutor implements Closeable, Flushable {
    public static EventProcessorExecutor eventProcessorExecutor() {
        return new EventProcessorExecutor();
    }

    private Optional<Thread> executor = Optional.empty();
    private boolean enabled = false;
    private final LinkedBlockingQueue<EventProcessor> tasks = new LinkedBlockingQueue<>();
    private Optional<EventProcessor> currentTask;

    private EventProcessorExecutor() {
    }

    public synchronized void start() {
        enabled = true;
        executor = Optional.of(
                new Thread(() -> {
                    while (enabled) {
                        executeNextTask();
                    }
                }));
        executor.get().start();
    }

    public synchronized void stopAndWaitForExit() {
        enabled = false;
        try {
            if (executor.isPresent()) {
                executor.get().interrupt();
                executor.get().join();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        executor = Optional.empty();
    }

    public void executeNextTask() {
        try {
            tasks.take().processEvents();
        } catch (InterruptedException e) {
            // Nothing is done.
        }
    }

    public void register(EventProcessor processor) {
        tasks.add(processor);
    }

    @Override
    public synchronized void close() {
        flush();
        stopAndWaitForExit();
    }

    /**
     * HACK This blocks all incoming events.
     */
    @Override
    public synchronized void flush() {
        try {
            while (!tasks.isEmpty()) {
                Thread.sleep(500L);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}