Support interrupting the kernel. See #33

This commit is contained in:
SpencerPark 2018-09-30 22:53:43 -04:00
parent d29537d3d3
commit 99d916fa58
5 changed files with 84 additions and 14 deletions

View File

@ -96,7 +96,7 @@ public class IJava {
String contents = new String(Files.readAllBytes(connectionFile));
JupyterSocket.JUPYTER_LOGGER.setLevel(Level.WARNING);
JupyterSocket.JUPYTER_LOGGER.setLevel(Level.ALL);
KernelConnectionProperties connProps = KernelConnectionProperties.parse(contents);
JupyterConnection connection = new JupyterConnection(connProps);

View File

@ -28,7 +28,6 @@ import io.github.spencerpark.jupyter.kernel.BaseKernel;
import io.github.spencerpark.jupyter.kernel.LanguageInfo;
import io.github.spencerpark.jupyter.kernel.ReplacementOptions;
import io.github.spencerpark.jupyter.kernel.display.DisplayData;
import io.github.spencerpark.jupyter.kernel.magic.*;
import io.github.spencerpark.jupyter.kernel.magic.registry.Magics;
import io.github.spencerpark.jupyter.kernel.util.CharPredicate;
import io.github.spencerpark.jupyter.kernel.util.GlobFinder;
@ -38,10 +37,7 @@ import io.github.spencerpark.jupyter.messages.Header;
import jdk.jshell.*;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
@ -171,6 +167,8 @@ public class JavaKernel extends BaseKernel {
return formatEvalException((EvalException) e);
} else if (e instanceof EvaluationTimeoutException) {
return formatEvaluationTimeoutException((EvaluationTimeoutException) e);
} else if (e instanceof EvaluationInterruptedException) {
return formatEvaluationInterruptedException((EvaluationInterruptedException) e);
} else {
fmt.addAll(super.formatError(e));
}
@ -245,6 +243,14 @@ public class JavaKernel extends BaseKernel {
return fmt;
}
private List<String> formatEvaluationInterruptedException(EvaluationInterruptedException e) {
List<String> fmt = new ArrayList<>(this.errorStyler.primaryLines(e.getSource()));
fmt.add(this.errorStyler.secondary("Evaluation interrupted."));
return fmt;
}
public Object evalRaw(String expr) throws Exception {
expr = this.magicsTransformer.transformMagics(expr);
@ -338,4 +344,9 @@ public class JavaKernel extends BaseKernel {
public void onShutdown(boolean isRestarting) {
this.evaluator.shutdown();
}
@Override
public void interrupt() {
this.evaluator.interrupt();
}
}

View File

@ -101,8 +101,17 @@ public class CodeEvaluator {
if (event.causeSnippet() == null) {
JShellException e = event.exception();
if (e != null) {
if (e instanceof EvalException && IJavaExecutionControl.EXECUTION_TIMEOUT_NAME.equals(((EvalException) e).getExceptionClassName()))
throw new EvaluationTimeoutException(executionControl.getTimeoutDuration(), executionControl.getTimeoutUnit(), code.trim());
if (e instanceof EvalException) {
switch (((EvalException) e).getExceptionClassName()) {
case IJavaExecutionControl.EXECUTION_TIMEOUT_NAME:
throw new EvaluationTimeoutException(executionControl.getTimeoutDuration(), executionControl.getTimeoutUnit(), code.trim());
case IJavaExecutionControl.EXECUTION_INTERRUPTED_NAME:
throw new EvaluationInterruptedException(code.trim());
default:
throw e;
}
}
throw e;
}
@ -136,7 +145,11 @@ public class CodeEvaluator {
}
public void interrupt() {
this.shell.stop();
IJavaExecutionControl executionControl =
this.executionControlProvider.getRegisteredControlByID(this.executionControlID);
if (executionControl != null)
executionControl.interrupt();
}
public void shutdown() {

View File

@ -0,0 +1,19 @@
package io.github.spencerpark.ijava.execution;
public class EvaluationInterruptedException extends Exception {
private final String source;
public EvaluationInterruptedException(String source) {
this.source = source;
}
public String getSource() {
return source;
}
@Override
public String getMessage() {
return String.format("Evaluator was interrupted while executing: '%s'",
this.source);
}
}

View File

@ -41,10 +41,18 @@ import java.util.concurrent.atomic.AtomicInteger;
public class IJavaExecutionControl extends DirectExecutionControl {
/**
* A special "class name" for a {@link jdk.jshell.spi.ExecutionControl.UserException} such that it may be
* identified after serialization into an {@link jdk.jshell.EvalException} via {@link EvalException#getExceptionClassName()}.
* identified after serialization into an {@link jdk.jshell.EvalException} via {@link
* EvalException#getExceptionClassName()}.
*/
public static final String EXECUTION_TIMEOUT_NAME = "Execution Timeout"; // Has spaces to not collide with a class name
/**
* A special "class name" for a {@link jdk.jshell.spi.ExecutionControl.UserException} such that it may be
* identified after serialization into an {@link jdk.jshell.EvalException} via {@link
* EvalException#getExceptionClassName()}
*/
public static final String EXECUTION_INTERRUPTED_NAME = "Execution Interrupted";
private static final Object NULL = new Object();
private static final AtomicInteger EXECUTOR_THREAD_ID = new AtomicInteger(0);
@ -54,6 +62,7 @@ public class IJavaExecutionControl extends DirectExecutionControl {
private final long timeoutTime;
private final TimeUnit timeoutUnit;
private final ConcurrentMap<String, Future<Object>> running = new ConcurrentHashMap<>();
private final Map<String, Object> results = new ConcurrentHashMap<>();
public IJavaExecutionControl() {
@ -81,17 +90,28 @@ public class IJavaExecutionControl extends DirectExecutionControl {
return result == NULL ? null : result;
}
private Object execute(Method doitMethod) throws TimeoutException, Exception {
private Object execute(String key, Method doitMethod) throws TimeoutException, Exception {
Future<Object> runningTask = this.executor.submit(() -> doitMethod.invoke(null));
this.running.put(key, runningTask);
try {
if (this.timeoutTime > 0)
return runningTask.get(this.timeoutTime, this.timeoutUnit);
return runningTask.get();
} catch (CancellationException e) {
// If canceled this means that stop() was invoked in which case the protocol is to
// throw an ExecutionControl.StoppedException.
throw new StoppedException();
// If canceled this means that stop() or interrupt() was invoked.
if (this.executor.isShutdown())
// If the executor is shutdown, the situation is the former in which
// case the protocol is to throw an ExecutionControl.StoppedException.
throw new StoppedException();
else
// The execution was purposely interrupted.
throw new UserException(
"Execution interrupted.",
EXECUTION_INTERRUPTED_NAME,
new StackTraceElement[]{} // The trace is irrelevant because it is in the kernel space not the user space so leave it blank.
);
} catch (ExecutionException e) {
// The execution threw an exception. The actual exception is the cause of the ExecutionException.
Throwable cause = e.getCause();
@ -110,6 +130,8 @@ public class IJavaExecutionControl extends DirectExecutionControl {
EXECUTION_TIMEOUT_NAME,
new StackTraceElement[]{} // The trace is irrelevant because it is in the kernel space not the user space so leave it blank.
);
} finally {
this.running.remove(key, runningTask);
}
}
@ -125,12 +147,17 @@ public class IJavaExecutionControl extends DirectExecutionControl {
*/
@Override
protected String invoke(Method doitMethod) throws Exception {
Object value = this.execute(doitMethod);
String id = UUID.randomUUID().toString();
Object value = this.execute(id, doitMethod);
this.results.put(id, value);
return id;
}
public void interrupt() {
this.running.forEach((id, future) ->
future.cancel(true));
}
@Override
public void stop() throws EngineTerminationException, InternalException {
this.executor.shutdownNow();