5. ShellCommandTask
Contents
5. ShellCommandTask#
import nest_asyncio
nest_asyncio.apply()
In addition to FunctionTask
, pydra allows for creating tasks from shell commands by using ShellCommandTask
.
Let’s run a simple command pwd
using pydra
import pydra
cmd = "pwd"
# we should use executable to pass the command we want to run
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd)
# we can always check the cmdline of our task
shelly.cmdline
'pwd'
and now let’s try to run it:
with pydra.Submitter(plugin="cf") as sub:
sub(shelly)
and check the result
shelly.result()
Result(output=Output(return_code=0, stdout='/private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpaq4wrb6w/ShellCommandTask_50215383834238ce614b4428a1e9943ddff15f8580a384a0a57b34c760864b3a\n', stderr=''), runtime=None, errored=False)
the result should have return_code
, stdout
and stderr
. If everything goes well return_code
should be 0
, stdout
should point to the working directory and stderr
should be an empty string.
Commands with arguments and inputs#
you can also use longer command by providing a list:
cmd = ["echo", "hail", "pydra"]
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd)
print("cmndline = ", shelly.cmdline)
with pydra.Submitter(plugin="cf") as sub:
sub(shelly)
shelly.result()
cmndline = echo hail pydra
Result(output=Output(return_code=0, stdout='hail pydra\n', stderr=''), runtime=None, errored=False)
using args#
In addition to executable
, we can also use args
. Last example can be also rewritten:
cmd = "echo"
args = ["hail", "pydra"]
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd, args=args)
print("cmndline = ", shelly.cmdline)
with pydra.Submitter(plugin="cf") as sub:
sub(shelly)
shelly.result()
cmndline = echo hail pydra
Result(output=Output(return_code=0, stdout='hail pydra\n', stderr=''), runtime=None, errored=False)
Customized input#
Pydra always checks executable
and args
, but we can also provide additional inputs, in order to do it, we have to modify input_spec
first by using SpecInfo
class:
import attr
my_input_spec = pydra.specs.SpecInfo(
name="Input",
fields=[
(
"text",
attr.ib(
type=str,
metadata={"position": 1, "argstr": "", "help_string": "text", "mandatory": True},
),
)
],
bases=(pydra.specs.ShellSpec,),
)
Notice, that in order to create your own input_spec
, you have to provide a list of fields
. There are several valid syntax to specify elements of fields
:
(name, attribute)
(name, type, default)
(name, type, default, metadata)
(name, type, metadata)
where name
, type
, and default
are the name, type and default values of the field. attribute
is defined by using attr.ib
, in the example the attribute has type
and metadata
, but the full specification can be found here.
In metadata
, you can provide additional information that is used by pydra
, help_string
is the only key that is required, and the full list of supported keys is ['position', 'argstr', 'requires', 'mandatory', 'allowed_values', 'output_field_name', 'copyfile', 'separate_ext', 'container_path', 'help_string', 'xor', 'output_file_template']
. Among the supported keys, you have:
help_string
: a sring, description of the argument;position
: integer grater than 0, defines the relative position of the arguments when the shell command is constructed;argstr
: a string, e.g. “-o”, can be used to specify a flag if needed for the command argument;mandatory
: a bool, if True, pydra will raise an exception, if the argument is not provided;
The complete documentations for all suported keys is available here.
To define my_input_spec
we used the most general syntax that requires (name, attribute)
, but
perhaps the simplest syntax is the last one, that contains (name, type, metadata)
. Using this syntax, my_input_spec
could look like this:
my_input_spec_short = pydra.specs.SpecInfo(
name="Input",
fields=[
("text", str, {"position": 1, "help_string": "text", "mandatory": True}),
],
bases=(pydra.specs.ShellSpec,),
)
After defining my_input_spec
, we can define our task:
cmd_exec = "echo"
hello = "HELLO"
shelly = pydra.ShellCommandTask(
name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec
)
print("cmndline = ", shelly.cmdline)
with pydra.Submitter(plugin="cf") as sub:
sub(shelly)
shelly.result()
cmndline = echo HELLO
Result(output=Output(return_code=0, stdout='HELLO\n', stderr=''), runtime=None, errored=False)
Customized output#
We can also customized output if we want to return something more than the stdout
, e.g. a file.
my_output_spec = pydra.specs.SpecInfo(
name="Output",
fields=[("newfile", pydra.specs.File, "newfile_tmp.txt")],
bases=(pydra.specs.ShellOutSpec,),
)
now we can create a task that returns a new file:
cmd = ["touch", "newfile_tmp.txt"]
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec)
print("cmndline = ", shelly.cmdline)
with pydra.Submitter(plugin="cf") as sub:
sub(shelly)
shelly.result()
cmndline = touch newfile_tmp.txt
Result(output=Output(return_code=0, stdout='', stderr='', newfile=PosixPath('/private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpyucu6ar3/ShellCommandTask_76e73f8b52e6d5d1920bda2a22e309fb4fd89cec14e6ab25755e2caf66cf0294/newfile_tmp.txt')), runtime=None, errored=False)
Exercise 1#
Write a task that creates two new files, use provided output spec.
cmd = "touch"
args = ["newfile_1.txt", "newfile_2.txt"]
my_output_spec = pydra.specs.SpecInfo(
name="Output",
fields=[
(
"out1",
attr.ib(
type=pydra.specs.File,
metadata={
"output_file_template": "{args}",
"help_string": "output file",
},
),
)
],
bases=(pydra.specs.ShellOutSpec,),
)
DO NOT RUN IF Docker IS NOT AVAILABLE
Note, that the following task use Docker, so they will fail if the Docker is not available. It will also fail in Binder.
DockerTask#
all the commands can be also run in a docker container using DockerTask
. Syntax is very similar, but additional argument image
is required.
cmd = "whoami"
docky = pydra.DockerTask(name="docky", executable=cmd, image="busybox")
with pydra.Submitter() as sub:
docky(submitter=sub)
docky.result()
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/core.py", line 506, in _run
self._run_task()
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/task.py", line 530, in _run_task
values = execute(args, strip=self.strip)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 584, in execute
rc, stdout, stderr = read_and_display(*cmd, strip=strip)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 549, in read_and_display
process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 488, in run
with Popen(*popenargs, **kwargs) as process:
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
"""
The above exception was the direct cause of the following exception:
FileNotFoundError Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/927375447.py in <module>
3
4 with pydra.Submitter() as sub:
----> 5 docky(submitter=sub)
6
7 docky.result()
~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
448 with submitter as sub:
449 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450 res = sub(self)
451 else: # tasks without state could be run without a submitter
452 res = self._run(rerun=rerun, **kwargs)
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
39 if cache_locations is not None:
40 runnable.cache_locations = cache_locations
---> 41 self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
42 return runnable.result()
43
~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
87 raise RuntimeError(
88 'Event loop stopped before Future completed.')
---> 89 return f.result()
90
91 def _run_once(self):
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
249 result = coro.send(None)
250 else:
--> 251 result = coro.throw(exc)
252 except StopIteration as exc:
253 if self._must_cancel:
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
74 if runnable.state is None:
75 # run_el should always return a coroutine
---> 76 await self.worker.run_el(runnable, rerun=rerun)
77 # 3
78 else:
~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
162 """Run a task (coroutine wrapper)."""
163 if isinstance(runnable, TaskBase):
--> 164 res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
165 else: # it could be tuple that includes pickle files with tasks and inputs
166 ind, task_main_pkl, task_orig = runnable
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
261 if not self.done():
262 self._asyncio_future_blocking = True
--> 263 yield self # This tells Task to wait for completion.
264 if not self.done():
265 raise RuntimeError("await wasn't used with future")
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
316 def __wakeup(self, future):
317 try:
--> 318 future.result()
319 except Exception as exc:
320 # This may also be a cancellation.
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
Exercise2#
Use splitter to run the same command in two different images:
cmd = "whoami"
docky = pydra.DockerTask(name="docky", executable=cmd, image=["busybox", "ubuntu"]).split("image")
with pydra.Submitter() as sub:
docky(submitter=sub)
docky.result()
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 804, in load_and_run
f" full crash report is here: {errorfile}",
FileNotFoundError: [Errno [Errno 2] No such file or directory: 'docker': 'docker'] full crash report is here: /private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpt7b7hoh2/DockerTask_ee1b8f2c3d054720481a37147112da91d5af039f4dc386709bd2e1e952f85b15/_error.pklz
"""
The above exception was the direct cause of the following exception:
FileNotFoundError Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/2962768199.py in <module>
3
4 with pydra.Submitter() as sub:
----> 5 docky(submitter=sub)
6
7 docky.result()
~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
448 with submitter as sub:
449 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450 res = sub(self)
451 else: # tasks without state could be run without a submitter
452 res = self._run(rerun=rerun, **kwargs)
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
39 if cache_locations is not None:
40 runnable.cache_locations = cache_locations
---> 41 self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
42 return runnable.result()
43
~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
87 raise RuntimeError(
88 'Event loop stopped before Future completed.')
---> 89 return f.result()
90
91 def _run_once(self):
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
249 result = coro.send(None)
250 else:
--> 251 result = coro.throw(exc)
252 except StopIteration as exc:
253 if self._must_cancel:
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
77 # 3
78 else:
---> 79 await self.expand_runnable(runnable, wait=True, rerun=rerun)
80 return True
81
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in expand_runnable(self, runnable, wait, rerun)
125 # if wait is True, we are at the end of the graph / state expansion.
126 # Once the remaining jobs end, we will exit `submit_from_call`
--> 127 await asyncio.gather(*futures)
128 return
129 # pass along futures to be awaited independently
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
316 def __wakeup(self, future):
317 try:
--> 318 future.result()
319 except Exception as exc:
320 # This may also be a cancellation.
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
249 result = coro.send(None)
250 else:
--> 251 result = coro.throw(exc)
252 except StopIteration as exc:
253 if self._must_cancel:
~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
166 ind, task_main_pkl, task_orig = runnable
167 res = await self.loop.run_in_executor(
--> 168 self.pool, load_and_run, task_main_pkl, ind, rerun
169 )
170 return res
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
261 if not self.done():
262 self._asyncio_future_blocking = True
--> 263 yield self # This tells Task to wait for completion.
264 if not self.done():
265 raise RuntimeError("await wasn't used with future")
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
316 def __wakeup(self, future):
317 try:
--> 318 future.result()
319 except Exception as exc:
320 # This may also be a cancellation.
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
FileNotFoundError: [Errno [Errno 2] No such file or directory: 'docker': 'docker'] full crash report is here: /private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpt7b7hoh2/DockerTask_ee1b8f2c3d054720481a37147112da91d5af039f4dc386709bd2e1e952f85b15/_error.pklz
#write your solution here
Using ShellCommandTask
with container_info
argument#
You can run the shell command in a docker container by adding container_info
argument to ShellCommandTask
:
shelly = pydra.ShellCommandTask(name="shelly", executable="whoami", container_info=("docker", "busybox"))
with pydra.Submitter() as sub:
shelly(submitter=sub)
shelly.result()
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/core.py", line 506, in _run
self._run_task()
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/task.py", line 530, in _run_task
values = execute(args, strip=self.strip)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 584, in execute
rc, stdout, stderr = read_and_display(*cmd, strip=strip)
File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 549, in read_and_display
process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 488, in run
with Popen(*popenargs, **kwargs) as process:
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
"""
The above exception was the direct cause of the following exception:
FileNotFoundError Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/2355727424.py in <module>
1 shelly = pydra.ShellCommandTask(name="shelly", executable="whoami", container_info=("docker", "busybox"))
2 with pydra.Submitter() as sub:
----> 3 shelly(submitter=sub)
4
5 shelly.result()
~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
448 with submitter as sub:
449 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450 res = sub(self)
451 else: # tasks without state could be run without a submitter
452 res = self._run(rerun=rerun, **kwargs)
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
39 if cache_locations is not None:
40 runnable.cache_locations = cache_locations
---> 41 self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
42 return runnable.result()
43
~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
87 raise RuntimeError(
88 'Event loop stopped before Future completed.')
---> 89 return f.result()
90
91 def _run_once(self):
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
249 result = coro.send(None)
250 else:
--> 251 result = coro.throw(exc)
252 except StopIteration as exc:
253 if self._must_cancel:
~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
74 if runnable.state is None:
75 # run_el should always return a coroutine
---> 76 await self.worker.run_el(runnable, rerun=rerun)
77 # 3
78 else:
~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
162 """Run a task (coroutine wrapper)."""
163 if isinstance(runnable, TaskBase):
--> 164 res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
165 else: # it could be tuple that includes pickle files with tasks and inputs
166 ind, task_main_pkl, task_orig = runnable
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
261 if not self.done():
262 self._asyncio_future_blocking = True
--> 263 yield self # This tells Task to wait for completion.
264 if not self.done():
265 raise RuntimeError("await wasn't used with future")
~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
316 def __wakeup(self, future):
317 try:
--> 318 future.result()
319 except Exception as exc:
320 # This may also be a cancellation.
~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
179 self.__log_traceback = False
180 if self._exception is not None:
--> 181 raise self._exception
182 return self._result
183
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
If we don’t provide container_info
the output should be different:
shelly = pydra.ShellCommandTask(name="shelly", executable="whoami")
with pydra.Submitter() as sub:
shelly(submitter=sub)
shelly.result()
Result(output=Output(return_code=0, stdout='yibeichen\n', stderr=''), runtime=None, errored=False)