FEATURE AVAILABILITY STARTING FROM RELEASE 2.8.2
Introduction
JS7 - GraalVM Python Jobs are based on Oracle® GraalVM.
This article explains how a GraalVM Python Job can execute an external script in a subprocess and forward its stdout and stderr output to the JS7 logger in real time.
This enables GraalVM Python Jobs to integrate existing scripts, tools, or services while maintaining full control over log streaming and cancellation behavior.
Example
Include Script Example
For more information about Script Include, see: JS7 - Script Include.
The following example demonstrates how to process stdout and stderr output from a subprocess in parallel using the Python threading module:
import subprocess, threading
# The ManagedProcess class encapsulates the run and cancel functions and streams logs internally.
class ManagedProcess:
@staticmethod
def _stream_logs(stream, logger, prefix):
# iter() reads until "" or EOF
for line in iter(stream.readline, ""):
logger(f"{prefix}{line.rstrip()}")
stream.close()
@staticmethod
def run(js7Step, cmd):
if not cmd:
raise ValueError("No command specified: 'cmd' must be a non-empty list")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
js7Step.setCancelableResource(process)
# Defines the thread for live stdout
stdout_thread = threading.Thread(
target=ManagedProcess._stream_logs,
args=(process.stdout, js7Step.getLogger().info, "[LIVE STDOUT] "),
daemon=False
)
# Defines the thread for live stderr
stderr_thread = threading.Thread(
target=ManagedProcess._stream_logs,
args=(process.stderr, js7Step.getLogger().error, "[LIVE STDERR] "),
daemon=False
)
# Starts threads to read stdout and stderr streams
stdout_thread.start()
stderr_thread.start()
# Waits until the process is finish
rc = process.wait()
# Ensures threads have finished reading
stdout_thread.join()
stderr_thread.join()
return rc
@staticmethod
def cancel(js7Step):
if js7Step is None:
return
# Gets the cancelable resource
process = js7Step.getCancelableResource()
if process is None:
return
# Kills the subprocess if it is still running
try:
if process.poll() is None:
js7Step.getLogger().info(f"[ManagedProcess.cancel][kill]{process}...")
process.kill()
except Exception as e:
js7Step.getLogger().error(f"[ManagedProcess.cancel][{process}]{e}")
return process.wait()
Job Example
The following example includes the Python-ManagedProcessClass script into the job and starts the subprocess.
import os, shlex
##!include Python-ManagedProcessClass
class JS7Job(js7.Job):
# Splits an argument string into a list, using Windows or POSIX shell rules depending on the platform
@staticmethod
def _split_args(arg_string):
if os.name == "nt":
return shlex.split(arg_string, posix=False)
return shlex.split(arg_string, posix=True)
def processOrder(self, js7Step):
# Reads the workflow arguments
executable_path = js7Step.getAllArgumentsAsNameValueMap().get("executable_path") # e. g. '/usr/bin/python3', '/bin/sh'
script_path = js7Step.getAllArgumentsAsNameValueMap().get("script_path") # e. g. '/home/user/script.py', '/script.sh'
arguments = js7Step.getAllArgumentsAsNameValueMap().get("arguments") # e. g. 'my_arg_1 my_arg_2'
script_arguments = js7Step.getAllArgumentsAsNameValueMap().get("script_arguments") # e. g. 'my_arg_1 my_arg_2'
# Creates the base command
if executable_path:
cmd = [executable_path]
else:
raise ValueError("No executable_path specified.")
# Adds arguments that will be passed to the executable
if arguments and arguments != "empty":
cmd += self._split_args(arguments)
if script_path and script_path != "empty":
cmd.append(script_path)
# Adds arguments that will be passed to the script
if script_arguments and script_arguments != "empty":
cmd += self._split_args(script_arguments)
# Executes the process
returncode = ManagedProcess.run(js7Step, cmd)
js7Step.getLogger().info(f"[ManagedProcess]returncode={returncode}")
js7Step.getOutcome().setReturnCode(returncode)
# Uses the onProcessOrderCanceled() hook after the order has been canceled
def onProcessOrderCanceled(self, js7Step):
ManagedProcess.cancel(js7Step)
Explanation:
- The
onProcessOrderCanceled()hook is used to terminate the subprocess if the order is canceled in JS7 JOC.
Note: Subprocesses run outside of the GraalVM Python environment. While the job waits for the subprocess to finish, stdout and stderr are processed in parallel threads, ensuring that output is streamed to the JS7 logger in real time.
Usage
The following usage examples show which arguments can be passed to the workflow.
Tar Example
| Key | Example Value | Description |
|---|---|---|
| executable_path | /usr/bin/tar | The path to an executable binary file. |
| arguments | xvf file.tar -C /tmp | A list of arguments passed to the binary, separated by spaces. |
| script_path | empty | If a script can be passed to the binary, for example a Python or shell script. |
| script_arguments | empty | A list of arguments passed to the script, separated by spaces. |
Python Example
| Key | Example Value | Description |
|---|---|---|
| executable_path | /usr/bin/python | Path to the Python interpreter. |
| arguments | -u | Unbuffered Mode. |
| script_path | /my_script.py | Path to the Python script. |
| script_arguments | empty | A list of arguments passed to the script, separated by spaces. |
Resources
Downloads
- Download Example Workflow (upload .json): pdCancelableSubprocessExample.workflow.json
- Download Example Include Script (upload .json): Python-ManagedProcessClass.includescript.json