5. ShellCommandTask#

import nest_asyncio
nest_asyncio.apply()

In addition to FunctionTask, pydra allows for creating tasks from shell commands by using ShellCommandTask.

Let’s run a simple command pwd using pydra

import pydra
cmd = "pwd"
# we should use executable to pass the command we want to run
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd)

# we can always check the cmdline of our task
shelly.cmdline
'pwd'

and now let’s try to run it:

with pydra.Submitter(plugin="cf") as sub:
    sub(shelly)

and check the result

shelly.result()
Result(output=Output(return_code=0, stdout='/private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpaq4wrb6w/ShellCommandTask_50215383834238ce614b4428a1e9943ddff15f8580a384a0a57b34c760864b3a\n', stderr=''), runtime=None, errored=False)

the result should have return_code, stdout and stderr. If everything goes well return_code should be 0, stdout should point to the working directory and stderr should be an empty string.

Commands with arguments and inputs#

you can also use longer command by providing a list:

cmd = ["echo", "hail", "pydra"]
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd)
print("cmndline = ", shelly.cmdline)

with pydra.Submitter(plugin="cf") as sub:
    sub(shelly)
shelly.result()
cmndline =  echo hail pydra
Result(output=Output(return_code=0, stdout='hail pydra\n', stderr=''), runtime=None, errored=False)

using args#

In addition to executable, we can also use args. Last example can be also rewritten:

cmd = "echo"
args = ["hail", "pydra"]

shelly = pydra.ShellCommandTask(name="shelly", executable=cmd, args=args)
print("cmndline = ", shelly.cmdline)

with pydra.Submitter(plugin="cf") as sub:
    sub(shelly)
shelly.result()
cmndline =  echo hail pydra
Result(output=Output(return_code=0, stdout='hail pydra\n', stderr=''), runtime=None, errored=False)

Customized input#

Pydra always checks executable and args, but we can also provide additional inputs, in order to do it, we have to modify input_spec first by using SpecInfo class:

import attr

my_input_spec = pydra.specs.SpecInfo(
    name="Input",
    fields=[
        (
            "text",
            attr.ib(
                type=str,
                metadata={"position": 1, "argstr": "", "help_string": "text", "mandatory": True},
                ),
        )
    ],
    bases=(pydra.specs.ShellSpec,),
)

Notice, that in order to create your own input_spec, you have to provide a list of fields. There are several valid syntax to specify elements of fields:

  • (name, attribute)

  • (name, type, default)

  • (name, type, default, metadata)

  • (name, type, metadata)

where name, type, and default are the name, type and default values of the field. attribute is defined by using attr.ib, in the example the attribute has type and metadata, but the full specification can be found here.

In metadata, you can provide additional information that is used by pydra, help_string is the only key that is required, and the full list of supported keys is ['position', 'argstr', 'requires', 'mandatory', 'allowed_values', 'output_field_name', 'copyfile', 'separate_ext', 'container_path', 'help_string', 'xor', 'output_file_template']. Among the supported keys, you have:

  • help_string: a sring, description of the argument;

  • position: integer grater than 0, defines the relative position of the arguments when the shell command is constructed;

  • argstr: a string, e.g. “-o”, can be used to specify a flag if needed for the command argument;

  • mandatory: a bool, if True, pydra will raise an exception, if the argument is not provided;

The complete documentations for all suported keys is available here.

To define my_input_spec we used the most general syntax that requires (name, attribute), but perhaps the simplest syntax is the last one, that contains (name, type, metadata). Using this syntax, my_input_spec could look like this:

my_input_spec_short = pydra.specs.SpecInfo(
    name="Input",
    fields=[
        ("text", str, {"position": 1, "help_string": "text", "mandatory": True}),
    ],
    bases=(pydra.specs.ShellSpec,),
)

After defining my_input_spec, we can define our task:

cmd_exec = "echo"
hello = "HELLO"
shelly = pydra.ShellCommandTask(
    name="shelly", executable=cmd_exec, text=hello, input_spec=my_input_spec
)

print("cmndline = ", shelly.cmdline)

with pydra.Submitter(plugin="cf") as sub:
    sub(shelly)
shelly.result()
cmndline =  echo HELLO
Result(output=Output(return_code=0, stdout='HELLO\n', stderr=''), runtime=None, errored=False)

Customized output#

We can also customized output if we want to return something more than the stdout, e.g. a file.

my_output_spec = pydra.specs.SpecInfo(
    name="Output",
    fields=[("newfile", pydra.specs.File, "newfile_tmp.txt")],
    bases=(pydra.specs.ShellOutSpec,),
)

now we can create a task that returns a new file:

cmd = ["touch", "newfile_tmp.txt"]
shelly = pydra.ShellCommandTask(name="shelly", executable=cmd, output_spec=my_output_spec)

print("cmndline = ", shelly.cmdline)

with pydra.Submitter(plugin="cf") as sub:
    sub(shelly)
shelly.result()
cmndline =  touch newfile_tmp.txt
Result(output=Output(return_code=0, stdout='', stderr='', newfile=PosixPath('/private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpyucu6ar3/ShellCommandTask_76e73f8b52e6d5d1920bda2a22e309fb4fd89cec14e6ab25755e2caf66cf0294/newfile_tmp.txt')), runtime=None, errored=False)

Exercise 1#

Write a task that creates two new files, use provided output spec.

cmd = "touch"
args = ["newfile_1.txt", "newfile_2.txt"]

my_output_spec = pydra.specs.SpecInfo(
    name="Output",
    fields=[
        (
            "out1",
            attr.ib(
                type=pydra.specs.File,
                metadata={
                    "output_file_template": "{args}",
                    "help_string": "output file",
                },
            ),
        )
    ],
    bases=(pydra.specs.ShellOutSpec,),
)

DO NOT RUN IF Docker IS NOT AVAILABLE

Note, that the following task use Docker, so they will fail if the Docker is not available. It will also fail in Binder.

DockerTask#

all the commands can be also run in a docker container using DockerTask. Syntax is very similar, but additional argument image is required.

cmd = "whoami"
docky = pydra.DockerTask(name="docky", executable=cmd, image="busybox")

with pydra.Submitter() as sub:
    docky(submitter=sub)

docky.result()
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/core.py", line 506, in _run
    self._run_task()
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/task.py", line 530, in _run_task
    values = execute(args, strip=self.strip)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 584, in execute
    rc, stdout, stderr = read_and_display(*cmd, strip=strip)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 549, in read_and_display
    process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 488, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 800, in __init__
    restore_signals, start_new_session)
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
"""

The above exception was the direct cause of the following exception:

FileNotFoundError                         Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/927375447.py in <module>
      3 
      4 with pydra.Submitter() as sub:
----> 5     docky(submitter=sub)
      6 
      7 docky.result()

~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
    448             with submitter as sub:
    449                 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450                 res = sub(self)
    451         else:  # tasks without state could be run without a submitter
    452             res = self._run(rerun=rerun, **kwargs)

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
     39         if cache_locations is not None:
     40             runnable.cache_locations = cache_locations
---> 41         self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
     42         return runnable.result()
     43 

~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     87                 raise RuntimeError(
     88                     'Event loop stopped before Future completed.')
---> 89             return f.result()
     90 
     91     def _run_once(self):

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    249                 result = coro.send(None)
    250             else:
--> 251                 result = coro.throw(exc)
    252         except StopIteration as exc:
    253             if self._must_cancel:

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
     74             if runnable.state is None:
     75                 # run_el should always return a coroutine
---> 76                 await self.worker.run_el(runnable, rerun=rerun)
     77             # 3
     78             else:

~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
    162         """Run a task (coroutine wrapper)."""
    163         if isinstance(runnable, TaskBase):
--> 164             res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
    165         else:  # it could be tuple that includes pickle files with tasks and inputs
    166             ind, task_main_pkl, task_orig = runnable

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
    261         if not self.done():
    262             self._asyncio_future_blocking = True
--> 263             yield self  # This tells Task to wait for completion.
    264         if not self.done():
    265             raise RuntimeError("await wasn't used with future")

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
    316     def __wakeup(self, future):
    317         try:
--> 318             future.result()
    319         except Exception as exc:
    320             # This may also be a cancellation.

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'

Exercise2#

Use splitter to run the same command in two different images:

cmd = "whoami"
docky = pydra.DockerTask(name="docky", executable=cmd, image=["busybox", "ubuntu"]).split("image")

with pydra.Submitter() as sub:
    docky(submitter=sub)

docky.result()
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 804, in load_and_run
    f" full crash report is here: {errorfile}",
FileNotFoundError: [Errno [Errno 2] No such file or directory: 'docker': 'docker']  full crash report is here: /private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpt7b7hoh2/DockerTask_ee1b8f2c3d054720481a37147112da91d5af039f4dc386709bd2e1e952f85b15/_error.pklz
"""

The above exception was the direct cause of the following exception:

FileNotFoundError                         Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/2962768199.py in <module>
      3 
      4 with pydra.Submitter() as sub:
----> 5     docky(submitter=sub)
      6 
      7 docky.result()

~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
    448             with submitter as sub:
    449                 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450                 res = sub(self)
    451         else:  # tasks without state could be run without a submitter
    452             res = self._run(rerun=rerun, **kwargs)

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
     39         if cache_locations is not None:
     40             runnable.cache_locations = cache_locations
---> 41         self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
     42         return runnable.result()
     43 

~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     87                 raise RuntimeError(
     88                     'Event loop stopped before Future completed.')
---> 89             return f.result()
     90 
     91     def _run_once(self):

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    249                 result = coro.send(None)
    250             else:
--> 251                 result = coro.throw(exc)
    252         except StopIteration as exc:
    253             if self._must_cancel:

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
     77             # 3
     78             else:
---> 79                 await self.expand_runnable(runnable, wait=True, rerun=rerun)
     80         return True
     81 

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in expand_runnable(self, runnable, wait, rerun)
    125             # if wait is True, we are at the end of the graph / state expansion.
    126             # Once the remaining jobs end, we will exit `submit_from_call`
--> 127             await asyncio.gather(*futures)
    128             return
    129         # pass along futures to be awaited independently

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
    316     def __wakeup(self, future):
    317         try:
--> 318             future.result()
    319         except Exception as exc:
    320             # This may also be a cancellation.

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    249                 result = coro.send(None)
    250             else:
--> 251                 result = coro.throw(exc)
    252         except StopIteration as exc:
    253             if self._must_cancel:

~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
    166             ind, task_main_pkl, task_orig = runnable
    167             res = await self.loop.run_in_executor(
--> 168                 self.pool, load_and_run, task_main_pkl, ind, rerun
    169             )
    170         return res

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
    261         if not self.done():
    262             self._asyncio_future_blocking = True
--> 263             yield self  # This tells Task to wait for completion.
    264         if not self.done():
    265             raise RuntimeError("await wasn't used with future")

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
    316     def __wakeup(self, future):
    317         try:
--> 318             future.result()
    319         except Exception as exc:
    320             # This may also be a cancellation.

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

FileNotFoundError: [Errno [Errno 2] No such file or directory: 'docker': 'docker']  full crash report is here: /private/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/tmpt7b7hoh2/DockerTask_ee1b8f2c3d054720481a37147112da91d5af039f4dc386709bd2e1e952f85b15/_error.pklz
#write your solution here

Using ShellCommandTask with container_info argument#

You can run the shell command in a docker container by adding container_info argument to ShellCommandTask:

shelly = pydra.ShellCommandTask(name="shelly", executable="whoami", container_info=("docker", "busybox"))
with pydra.Submitter() as sub:
    shelly(submitter=sub)

shelly.result()
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/core.py", line 506, in _run
    self._run_task()
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/task.py", line 530, in _run_task
    values = execute(args, strip=self.strip)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 584, in execute
    rc, stdout, stderr = read_and_display(*cmd, strip=strip)
  File "/Users/yibeichen/GDrive/GitHub/pydra/pydra/engine/helpers.py", line 549, in read_and_display
    process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 488, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 800, in __init__
    restore_signals, start_new_session)
  File "/Users/yibeichen/miniconda3/envs/pydra/lib/python3.7/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'
"""

The above exception was the direct cause of the following exception:

FileNotFoundError                         Traceback (most recent call last)
/var/folders/wr/x5xt_yqs2cvc_gb3sf147lvc0000gn/T/ipykernel_70822/2355727424.py in <module>
      1 shelly = pydra.ShellCommandTask(name="shelly", executable="whoami", container_info=("docker", "busybox"))
      2 with pydra.Submitter() as sub:
----> 3     shelly(submitter=sub)
      4 
      5 shelly.result()

~/GDrive/GitHub/pydra/pydra/engine/core.py in __call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
    448             with submitter as sub:
    449                 self.inputs = attr.evolve(self.inputs, **kwargs)
--> 450                 res = sub(self)
    451         else:  # tasks without state could be run without a submitter
    452             res = self._run(rerun=rerun, **kwargs)

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
     39         if cache_locations is not None:
     40             runnable.cache_locations = cache_locations
---> 41         self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
     42         return runnable.result()
     43 

~/miniconda3/envs/pydra/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     87                 raise RuntimeError(
     88                     'Event loop stopped before Future completed.')
---> 89             return f.result()
     90 
     91     def _run_once(self):

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    249                 result = coro.send(None)
    250             else:
--> 251                 result = coro.throw(exc)
    252         except StopIteration as exc:
    253             if self._must_cancel:

~/GDrive/GitHub/pydra/pydra/engine/submitter.py in submit_from_call(self, runnable, rerun)
     74             if runnable.state is None:
     75                 # run_el should always return a coroutine
---> 76                 await self.worker.run_el(runnable, rerun=rerun)
     77             # 3
     78             else:

~/GDrive/GitHub/pydra/pydra/engine/workers.py in exec_as_coro(self, runnable, rerun)
    162         """Run a task (coroutine wrapper)."""
    163         if isinstance(runnable, TaskBase):
--> 164             res = await self.loop.run_in_executor(self.pool, runnable._run, rerun)
    165         else:  # it could be tuple that includes pickle files with tasks and inputs
    166             ind, task_main_pkl, task_orig = runnable

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in __await__(self)
    261         if not self.done():
    262             self._asyncio_future_blocking = True
--> 263             yield self  # This tells Task to wait for completion.
    264         if not self.done():
    265             raise RuntimeError("await wasn't used with future")

~/miniconda3/envs/pydra/lib/python3.7/asyncio/tasks.py in __wakeup(self, future)
    316     def __wakeup(self, future):
    317         try:
--> 318             future.result()
    319         except Exception as exc:
    320             # This may also be a cancellation.

~/miniconda3/envs/pydra/lib/python3.7/asyncio/futures.py in result(self)
    179         self.__log_traceback = False
    180         if self._exception is not None:
--> 181             raise self._exception
    182         return self._result
    183 

FileNotFoundError: [Errno 2] No such file or directory: 'docker': 'docker'

If we don’t provide container_info the output should be different:

shelly = pydra.ShellCommandTask(name="shelly", executable="whoami")
with pydra.Submitter() as sub:
    shelly(submitter=sub)

shelly.result()
Result(output=Output(return_code=0, stdout='yibeichen\n', stderr=''), runtime=None, errored=False)