From 953f3df186db0d80821d2d17d6c90149366ee4c8 Mon Sep 17 00:00:00 2001 From: CI Date: Tue, 8 Apr 2025 07:56:01 +0000 Subject: [PATCH] Build branch main with version main (8f9353f) Build pipeline: viash-hub.craftbox.main-w5ngd Source commit: https://github.com/viash-hub/craftbox/commit/8f9353f15e4d6952eca57e896f962a60b42b0a3c Source message: add documentation and bump viash (#9) --- CHANGELOG.md | 6 + _viash.yaml | 2 +- src/_authors/dorien_roosen.yaml | 10 + src/_authors/robrecht_cannoodt.yaml | 14 + src/concat_text/config.vsh.yaml | 4 + src/csv2fasta/config.vsh.yaml | 8 + src/sync_resources/config.vsh.yaml | 13 +- src/untar/config.vsh.yaml | 8 + .../executable/concat_text/.config.vsh.yaml | 12 +- target/executable/concat_text/concat_text | 84 ++-- target/executable/csv2fasta/.config.vsh.yaml | 43 +- target/executable/csv2fasta/csv2fasta | 181 ++++--- .../sync_resources/.config.vsh.yaml | 47 +- .../executable/sync_resources/sync_resources | 103 ++-- target/executable/untar/.config.vsh.yaml | 43 +- target/executable/untar/untar | 89 ++-- target/nextflow/concat_text/.config.vsh.yaml | 12 +- target/nextflow/concat_text/main.nf | 405 ++++++++++++--- target/nextflow/csv2fasta/.config.vsh.yaml | 43 +- target/nextflow/csv2fasta/main.nf | 461 +++++++++++++++--- target/nextflow/csv2fasta/nextflow.config | 1 + .../nextflow/sync_resources/.config.vsh.yaml | 47 +- target/nextflow/sync_resources/main.nf | 461 +++++++++++++++--- .../nextflow/sync_resources/nextflow.config | 3 +- .../sync_resources/nextflow_schema.json | 2 +- target/nextflow/untar/.config.vsh.yaml | 43 +- target/nextflow/untar/main.nf | 459 ++++++++++++++--- target/nextflow/untar/nextflow.config | 1 + 28 files changed, 2101 insertions(+), 504 deletions(-) create mode 100644 src/_authors/dorien_roosen.yaml create mode 100644 src/_authors/robrecht_cannoodt.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index b3598e5..1b0a6cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ * `sync_resources`: Sync a Viash package's test resources to the local filesystem (PR #7). +## MINOR CHANGES + +* Add documentation to multiple components (PR #9). + +* Bump Viash to 0.9.3 (PR #9). + ## BUG FIXES * `untar`: Fix usage of a deprecated environment variable (PR #8). diff --git a/_viash.yaml b/_viash.yaml index 204a056..695b89e 100644 --- a/_viash.yaml +++ b/_viash.yaml @@ -7,7 +7,7 @@ links: issue_tracker: https://github.com/viash-hub/craftbox/issues repository: https://github.com/viash-hub/craftbox -viash_version: 0.9.0 +viash_version: 0.9.3 config_mods: | .requirements.commands := ['ps'] diff --git a/src/_authors/dorien_roosen.yaml b/src/_authors/dorien_roosen.yaml new file mode 100644 index 0000000..d67448d --- /dev/null +++ b/src/_authors/dorien_roosen.yaml @@ -0,0 +1,10 @@ +name: Dorien Roosen +info: + links: + email: dorien@data-intuitive.com + github: dorien-er + linkedin: dorien-roosen + organizations: + - name: Data Intuitive + href: https://www.data-intuitive.com + role: Data Scientist diff --git a/src/_authors/robrecht_cannoodt.yaml b/src/_authors/robrecht_cannoodt.yaml new file mode 100644 index 0000000..c4c1bde --- /dev/null +++ b/src/_authors/robrecht_cannoodt.yaml @@ -0,0 +1,14 @@ +name: Robrecht Cannoodt +info: + links: + email: robrecht@data-intuitive.com + github: rcannood + orcid: "0000-0003-3641-729X" + linkedin: robrechtcannoodt + organizations: + - name: Data Intuitive + href: https://www.data-intuitive.com + role: Data Science Engineer + - name: Open Problems + href: https://openproblems.bio + role: Core Member diff --git a/src/concat_text/config.vsh.yaml b/src/concat_text/config.vsh.yaml index 6a825ac..a76f83e 100644 --- a/src/concat_text/config.vsh.yaml +++ b/src/concat_text/config.vsh.yaml @@ -1,19 +1,23 @@ name: concat_text +summary: Concatenate a number of text files description: | Concatenate a number of text files, handle gzipped text files gracefully and optionally gzip the output text file. This component is useful for concatening fastq files from different lanes, for instance. + authors: - __merge__: /src/_authors/toni_verbeiren.yaml roles: [ author, maintainer ] - __merge__: /src/_authors/dries_schaumont.yaml roles: [ reviewer ] + info: improvements: | This component could be improved in 2 ways: 1. Allow for a mix of zipped and plain input files 2. Allow to specify a compression algorithm for the output + argument_groups: - name: Input arguments arguments: diff --git a/src/csv2fasta/config.vsh.yaml b/src/csv2fasta/config.vsh.yaml index c49f73e..3f63ed2 100644 --- a/src/csv2fasta/config.vsh.yaml +++ b/src/csv2fasta/config.vsh.yaml @@ -1,4 +1,5 @@ name: csv2fasta +summary: Convert a CSV file to FASTA entries description: | Convert two columns from a CSV file to FASTA entries. The CSV file can contain an optional header and each row (other than the header) becomes @@ -6,6 +7,13 @@ description: | for the FASTA entries, while the other become the sequences. The sequences column must only contain characters that are valid IUPAC notation for nucleotides or a group thereof (wildcard characters). + +authors: + - __merge__: /src/_authors/dries_schaumont.yaml + roles: [ author, maintainer ] + - __merge__: /src/_authors/robrecht_cannoodt.yaml + roles: [ reviewer ] + argument_groups: - name: Inputs arguments: diff --git a/src/sync_resources/config.vsh.yaml b/src/sync_resources/config.vsh.yaml index a6a144c..c421c8a 100644 --- a/src/sync_resources/config.vsh.yaml +++ b/src/sync_resources/config.vsh.yaml @@ -1,8 +1,19 @@ name: sync_resources -description: Sync a Viash package's test resources to the local filesystem +summary: Sync a Viash package's test resources to the local filesystem +description: | + Sync a Viash package's test resources to the local filesystem based on the + the `.info.test_resources` field in the `_viash.yaml` file. This is useful for + testing and debugging purposes. usage: | sync_resources sync_resources --input _viash.yaml --output . + +authors: + - __merge__: /src/_authors/robrecht_cannoodt.yaml + roles: [ author, maintainer ] + - __merge__: /src/_authors/dries_schaumont.yaml + roles: [ reviewer ] + argument_groups: - name: Inputs arguments: diff --git a/src/untar/config.vsh.yaml b/src/untar/config.vsh.yaml index 727ed61..10e12ca 100644 --- a/src/untar/config.vsh.yaml +++ b/src/untar/config.vsh.yaml @@ -1,7 +1,15 @@ name: untar +summary: Unpack a .tar file description: | Unpack a .tar file. When the contents of the .tar file is just a single directory, put the contents of the directory into the output folder instead of that directory. + +authors: + - __merge__: /src/_authors/dries_schaumont.yaml + roles: [ author, maintainer ] + - __merge__: /src/_authors/robrecht_cannoodt.yaml + roles: [ reviewer ] + argument_groups: - name: Input arguments arguments: diff --git a/target/executable/concat_text/.config.vsh.yaml b/target/executable/concat_text/.config.vsh.yaml index 3a5ced8..9c85e02 100644 --- a/target/executable/concat_text/.config.vsh.yaml +++ b/target/executable/concat_text/.config.vsh.yaml @@ -64,6 +64,7 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true +summary: "Concatenate a number of text files" description: "Concatenate a number of text files, handle gzipped text files gracefully\ \ and\noptionally gzip the output text file.\n\nThis component is useful for concatening\ \ fastq files from different lanes, for instance.\n" @@ -76,6 +77,9 @@ info: \ of zipped and plain input files\n 2. Allow to specify a compression algorithm\ \ for the output\n" status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -170,16 +174,16 @@ build_info: engine: "docker|native" output: "target/executable/concat_text" executable: "target/executable/concat_text/concat_text" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/executable/concat_text/concat_text b/target/executable/concat_text/concat_text index 4123046..ec73d5a 100755 --- a/target/executable/concat_text/concat_text +++ b/target/executable/concat_text/concat_text @@ -2,7 +2,7 @@ # concat_text main # -# This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +# This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative # work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data # Intuitive. # @@ -173,32 +173,6 @@ VIASH_META_CONFIG="$VIASH_META_RESOURCES_DIR/.config.vsh.yaml" VIASH_META_TEMP_DIR="$VIASH_TEMP" -# ViashHelp: Display helpful explanation about this executable -function ViashHelp { - echo "concat_text main" - echo "" - echo "Concatenate a number of text files, handle gzipped text files gracefully and" - echo "optionally gzip the output text file." - echo "" - echo "This component is useful for concatening fastq files from different lanes, for" - echo "instance." - echo "" - echo "Input arguments:" - echo " --input" - echo " type: file, required parameter, multiple values allowed, file must exist" - echo " example: input?.txt.gz" - echo " A list of (gzipped) text files." - echo "" - echo "Output arguments:" - echo " --gzip_output" - echo " type: boolean_true" - echo " Should the output be zipped?" - echo "" - echo " --output" - echo " type: file, output, file must exist" - echo " example: output.txt" - echo " File to write the output to, optionally gzipped." -} # initialise variables VIASH_MODE='run' @@ -479,9 +453,9 @@ RUN apk add --no-cache bash procps file LABEL org.opencontainers.image.authors="Toni Verbeiren, Dries Schaumont" LABEL org.opencontainers.image.description="Companion container for running component concat_text" -LABEL org.opencontainers.image.created="2025-04-08T07:25:22Z" +LABEL org.opencontainers.image.created="2025-04-08T07:49:55Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/craftbox" -LABEL org.opencontainers.image.revision="3c8413009764e3a6839e3e8b038857caf7047593" +LABEL org.opencontainers.image.revision="8f9353f15e4d6952eca57e896f962a60b42b0a3c" LABEL org.opencontainers.image.version="main" VIASHDOCKER @@ -596,6 +570,58 @@ fi # initialise docker variables VIASH_DOCKER_RUN_ARGS=(-i --rm) + +# ViashHelp: Display helpful explanation about this executable +function ViashHelp { + echo "concat_text main" + echo "" + echo "Concatenate a number of text files, handle gzipped text files gracefully and" + echo "optionally gzip the output text file." + echo "" + echo "This component is useful for concatening fastq files from different lanes, for" + echo "instance." + echo "" + echo "Input arguments:" + echo " --input" + echo " type: file, required parameter, multiple values allowed, file must exist" + echo " example: input?.txt.gz" + echo " A list of (gzipped) text files." + echo "" + echo "Output arguments:" + echo " --gzip_output" + echo " type: boolean_true" + echo " Should the output be zipped?" + echo "" + echo " --output" + echo " type: file, output, file must exist" + echo " example: output.txt" + echo " File to write the output to, optionally gzipped." + echo "" + echo "Viash built in Computational Requirements:" + echo " ---cpus=INT" + echo " Number of CPUs to use" + echo " ---memory=STRING" + echo " Amount of memory to use. Examples: 4GB, 3MiB." + echo "" + echo "Viash built in Docker:" + echo " ---setup=STRATEGY" + echo " Setup the docker container. Options are: alwaysbuild, alwayscachedbuild, ifneedbebuild, ifneedbecachedbuild, alwayspull, alwayspullelsebuild, alwayspullelsecachedbuild, ifneedbepull, ifneedbepullelsebuild, ifneedbepullelsecachedbuild, push, pushifnotpresent, donothing." + echo " Default: ifneedbepullelsecachedbuild" + echo " ---dockerfile" + echo " Print the dockerfile to stdout." + echo " ---docker_run_args=ARG" + echo " Provide runtime arguments to Docker. See the documentation on \`docker run\` for more information." + echo " ---docker_image_id" + echo " Print the docker image id to stdout." + echo " ---debug" + echo " Enter the docker container for debugging purposes." + echo "" + echo "Viash built in Engines:" + echo " ---engine=ENGINE_ID" + echo " Specify the engine to use. Options are: docker, native." + echo " Default: docker" +} + # initialise array VIASH_POSITIONAL_ARGS='' diff --git a/target/executable/csv2fasta/.config.vsh.yaml b/target/executable/csv2fasta/.config.vsh.yaml index c5adb34..d7699ae 100644 --- a/target/executable/csv2fasta/.config.vsh.yaml +++ b/target/executable/csv2fasta/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "csv2fasta" version: "main" +authors: +- name: "Dries Schaumont" + roles: + - "author" + - "maintainer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" +- name: "Robrecht Cannoodt" + roles: + - "reviewer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" argument_groups: - name: "Inputs" arguments: @@ -104,6 +135,7 @@ resources: - type: "python_script" path: "script.py" is_executable: true +summary: "Convert a CSV file to FASTA entries" description: "Convert two columns from a CSV file to FASTA entries. The CSV file can\n\ contain an optional header and each row (other than the header) becomes\na single\ \ FASTA record. One of the two columns will be used as the names\nfor the FASTA\ @@ -116,6 +148,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -221,16 +256,16 @@ build_info: engine: "docker|native" output: "target/executable/csv2fasta" executable: "target/executable/csv2fasta/csv2fasta" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/executable/csv2fasta/csv2fasta b/target/executable/csv2fasta/csv2fasta index ea485d5..68d3d50 100755 --- a/target/executable/csv2fasta/csv2fasta +++ b/target/executable/csv2fasta/csv2fasta @@ -2,7 +2,7 @@ # csv2fasta main # -# This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +# This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative # work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data # Intuitive. # @@ -10,6 +10,10 @@ # authors of this component should specify the license in the header of such # files, or include a separate license file detailing the licenses of all included # files. +# +# Component authors: +# * Dries Schaumont (author, maintainer) +# * Robrecht Cannoodt (reviewer) set -e @@ -169,78 +173,6 @@ VIASH_META_CONFIG="$VIASH_META_RESOURCES_DIR/.config.vsh.yaml" VIASH_META_TEMP_DIR="$VIASH_TEMP" -# ViashHelp: Display helpful explanation about this executable -function ViashHelp { - echo "csv2fasta main" - echo "" - echo "Convert two columns from a CSV file to FASTA entries. The CSV file can" - echo "contain an optional header and each row (other than the header) becomes" - echo "a single FASTA record. One of the two columns will be used as the names" - echo "for the FASTA entries, while the other become the sequences. The sequences" - echo "column must only contain characters that are valid IUPAC notation for" - echo "nucleotides or a group thereof (wildcard characters)." - echo "" - echo "Inputs:" - echo " --input" - echo " type: file, required parameter, file must exist" - echo " example: barcodes.csv" - echo " CSV file to be processed." - echo "" - echo " --header" - echo " type: boolean_true" - echo " Parse the first line of the CSV file as a header." - echo "" - echo "CSV dialect options:" - echo " Options that can be used to override the automatically detected" - echo " dialect of the CSV file." - echo "" - echo " --delimiter" - echo " type: string" - echo " Overwrite the column delimiter character." - echo "" - echo " --quote_character" - echo " type: string" - echo " Overwrite the character used to denote the start and end of a quoted" - echo " item." - echo "" - echo "CSV column arguments:" - echo " Parameters for the selection of columns from the CSV file." - echo " Only required when your CSV file contains more than 2 columns," - echo " otherwise the first column will be used for the FASTA header" - echo " and the second for the FASTA nucleotide sequences. This default" - echo " can still be overwritten by using the options below." - echo "" - echo " --sequence_column" - echo " type: string" - echo " Name of the column containing the sequences. Implies 'header'." - echo " Cannot be used together with 'sequence_column_index'." - echo "" - echo " --name_column" - echo " type: string" - echo " Name of the column describing the FASTA headers. Implies 'header'." - echo " Cannot be used together with 'name_column_index'." - echo "" - echo " --sequence_column_index" - echo " type: integer" - echo " min: 0" - echo " Index of the column to use as the FASTA sequences, counter from the left" - echo " and" - echo " starting from 0. Cannot be used in combination with the" - echo " 'sequence_column' argument." - echo "" - echo " --name_column_index" - echo " type: integer" - echo " min: 0" - echo " Index of the column to use as the FASTA headers, counter from the left" - echo " and" - echo " starting from 0. Cannot be used in combination with 'name_column'." - echo "" - echo "Outputs:" - echo " --output" - echo " type: file, output, file must exist" - echo " example: barcodes.fasta" - echo " Output fasta file." -} # initialise variables VIASH_MODE='run' @@ -524,10 +456,11 @@ RUN apt-get update && \ RUN pip install --upgrade pip && \ pip install --upgrade --no-cache-dir "dnaio" +LABEL org.opencontainers.image.authors="Dries Schaumont, Robrecht Cannoodt" LABEL org.opencontainers.image.description="Companion container for running component csv2fasta" -LABEL org.opencontainers.image.created="2025-04-08T07:25:22Z" +LABEL org.opencontainers.image.created="2025-04-08T07:49:56Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/craftbox" -LABEL org.opencontainers.image.revision="3c8413009764e3a6839e3e8b038857caf7047593" +LABEL org.opencontainers.image.revision="8f9353f15e4d6952eca57e896f962a60b42b0a3c" LABEL org.opencontainers.image.version="main" VIASHDOCKER @@ -642,6 +575,104 @@ fi # initialise docker variables VIASH_DOCKER_RUN_ARGS=(-i --rm) + +# ViashHelp: Display helpful explanation about this executable +function ViashHelp { + echo "csv2fasta main" + echo "" + echo "Convert two columns from a CSV file to FASTA entries. The CSV file can" + echo "contain an optional header and each row (other than the header) becomes" + echo "a single FASTA record. One of the two columns will be used as the names" + echo "for the FASTA entries, while the other become the sequences. The sequences" + echo "column must only contain characters that are valid IUPAC notation for" + echo "nucleotides or a group thereof (wildcard characters)." + echo "" + echo "Inputs:" + echo " --input" + echo " type: file, required parameter, file must exist" + echo " example: barcodes.csv" + echo " CSV file to be processed." + echo "" + echo " --header" + echo " type: boolean_true" + echo " Parse the first line of the CSV file as a header." + echo "" + echo "CSV dialect options:" + echo " Options that can be used to override the automatically detected" + echo " dialect of the CSV file." + echo "" + echo " --delimiter" + echo " type: string" + echo " Overwrite the column delimiter character." + echo "" + echo " --quote_character" + echo " type: string" + echo " Overwrite the character used to denote the start and end of a quoted" + echo " item." + echo "" + echo "CSV column arguments:" + echo " Parameters for the selection of columns from the CSV file." + echo " Only required when your CSV file contains more than 2 columns," + echo " otherwise the first column will be used for the FASTA header" + echo " and the second for the FASTA nucleotide sequences. This default" + echo " can still be overwritten by using the options below." + echo "" + echo " --sequence_column" + echo " type: string" + echo " Name of the column containing the sequences. Implies 'header'." + echo " Cannot be used together with 'sequence_column_index'." + echo "" + echo " --name_column" + echo " type: string" + echo " Name of the column describing the FASTA headers. Implies 'header'." + echo " Cannot be used together with 'name_column_index'." + echo "" + echo " --sequence_column_index" + echo " type: integer" + echo " min: 0" + echo " Index of the column to use as the FASTA sequences, counter from the left" + echo " and" + echo " starting from 0. Cannot be used in combination with the" + echo " 'sequence_column' argument." + echo "" + echo " --name_column_index" + echo " type: integer" + echo " min: 0" + echo " Index of the column to use as the FASTA headers, counter from the left" + echo " and" + echo " starting from 0. Cannot be used in combination with 'name_column'." + echo "" + echo "Outputs:" + echo " --output" + echo " type: file, output, file must exist" + echo " example: barcodes.fasta" + echo " Output fasta file." + echo "" + echo "Viash built in Computational Requirements:" + echo " ---cpus=INT" + echo " Number of CPUs to use" + echo " ---memory=STRING" + echo " Amount of memory to use. Examples: 4GB, 3MiB." + echo "" + echo "Viash built in Docker:" + echo " ---setup=STRATEGY" + echo " Setup the docker container. Options are: alwaysbuild, alwayscachedbuild, ifneedbebuild, ifneedbecachedbuild, alwayspull, alwayspullelsebuild, alwayspullelsecachedbuild, ifneedbepull, ifneedbepullelsebuild, ifneedbepullelsecachedbuild, push, pushifnotpresent, donothing." + echo " Default: ifneedbepullelsecachedbuild" + echo " ---dockerfile" + echo " Print the dockerfile to stdout." + echo " ---docker_run_args=ARG" + echo " Provide runtime arguments to Docker. See the documentation on \`docker run\` for more information." + echo " ---docker_image_id" + echo " Print the docker image id to stdout." + echo " ---debug" + echo " Enter the docker container for debugging purposes." + echo "" + echo "Viash built in Engines:" + echo " ---engine=ENGINE_ID" + echo " Specify the engine to use. Options are: docker, native." + echo " Default: docker" +} + # initialise array VIASH_POSITIONAL_ARGS='' diff --git a/target/executable/sync_resources/.config.vsh.yaml b/target/executable/sync_resources/.config.vsh.yaml index 72f6dc3..8c4b843 100644 --- a/target/executable/sync_resources/.config.vsh.yaml +++ b/target/executable/sync_resources/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "sync_resources" version: "main" +authors: +- name: "Robrecht Cannoodt" + roles: + - "author" + - "maintainer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" +- name: "Dries Schaumont" + roles: + - "reviewer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" argument_groups: - name: "Inputs" arguments: @@ -53,7 +84,10 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true -description: "Sync a Viash package's test resources to the local filesystem" +summary: "Sync a Viash package's test resources to the local filesystem" +description: "Sync a Viash package's test resources to the local filesystem based\ + \ on the\nthe `.info.test_resources` field in the `_viash.yaml` file. This is useful\ + \ for\ntesting and debugging purposes.\n" usage: "sync_resources\nsync_resources --input _viash.yaml --output .\n" test_resources: - type: "bash_script" @@ -61,6 +95,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -159,16 +196,16 @@ build_info: engine: "docker|native" output: "target/executable/sync_resources" executable: "target/executable/sync_resources/sync_resources" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/executable/sync_resources/sync_resources b/target/executable/sync_resources/sync_resources index efa91a4..dd958c1 100755 --- a/target/executable/sync_resources/sync_resources +++ b/target/executable/sync_resources/sync_resources @@ -2,7 +2,7 @@ # sync_resources main # -# This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +# This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative # work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data # Intuitive. # @@ -10,6 +10,10 @@ # authors of this component should specify the license in the header of such # files, or include a separate license file detailing the licenses of all included # files. +# +# Component authors: +# * Robrecht Cannoodt (author, maintainer) +# * Dries Schaumont (reviewer) set -e @@ -169,38 +173,6 @@ VIASH_META_CONFIG="$VIASH_META_RESOURCES_DIR/.config.vsh.yaml" VIASH_META_TEMP_DIR="$VIASH_TEMP" -# ViashHelp: Display helpful explanation about this executable -function ViashHelp { - echo "sync_resources main" - echo "" - echo "Sync a Viash package's test resources to the local filesystem" - echo "" - echo "Usage:" - echo "sync_resources" - echo "sync_resources --input _viash.yaml --output ." - echo "" - echo "Inputs:" - echo " -i, --input" - echo " type: file, file must exist" - echo " default: _viash.yaml" - echo " Path to the _viash.yaml project configuration file." - echo "" - echo "Outputs:" - echo " -o, --output" - echo " type: file, output, file must exist" - echo " default: ." - echo " Path to the directory where the resources will be synced to." - echo "" - echo "Arguments:" - echo " --dryrun" - echo " type: boolean_true" - echo " Does not display the operations performed from the specified command." - echo "" - echo " --exclude" - echo " type: string, multiple values allowed" - echo " Exclude all files or objects from the command that matches the specified" - echo " pattern." -} # initialise variables VIASH_MODE='run' @@ -481,10 +453,11 @@ RUN apk add --no-cache bash rclone yq RUN rclone config create s3 s3 anonymous=true RUN rclone config create gs gcs anonymous=true +LABEL org.opencontainers.image.authors="Robrecht Cannoodt, Dries Schaumont" LABEL org.opencontainers.image.description="Companion container for running component sync_resources" -LABEL org.opencontainers.image.created="2025-04-08T07:25:22Z" +LABEL org.opencontainers.image.created="2025-04-08T07:49:55Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/craftbox" -LABEL org.opencontainers.image.revision="3c8413009764e3a6839e3e8b038857caf7047593" +LABEL org.opencontainers.image.revision="8f9353f15e4d6952eca57e896f962a60b42b0a3c" LABEL org.opencontainers.image.version="main" VIASHDOCKER @@ -599,6 +572,66 @@ fi # initialise docker variables VIASH_DOCKER_RUN_ARGS=(-i --rm) + +# ViashHelp: Display helpful explanation about this executable +function ViashHelp { + echo "sync_resources main" + echo "" + echo "Sync a Viash package's test resources to the local filesystem based on the" + echo "the \`.info.test_resources\` field in the \`_viash.yaml\` file. This is useful for" + echo "testing and debugging purposes." + echo "" + echo "Usage:" + echo "sync_resources" + echo "sync_resources --input _viash.yaml --output ." + echo "" + echo "Inputs:" + echo " -i, --input" + echo " type: file, file must exist" + echo " default: _viash.yaml" + echo " Path to the _viash.yaml project configuration file." + echo "" + echo "Outputs:" + echo " -o, --output" + echo " type: file, output, file must exist" + echo " default: ." + echo " Path to the directory where the resources will be synced to." + echo "" + echo "Arguments:" + echo " --dryrun" + echo " type: boolean_true" + echo " Does not display the operations performed from the specified command." + echo "" + echo " --exclude" + echo " type: string, multiple values allowed" + echo " Exclude all files or objects from the command that matches the specified" + echo " pattern." + echo "" + echo "Viash built in Computational Requirements:" + echo " ---cpus=INT" + echo " Number of CPUs to use" + echo " ---memory=STRING" + echo " Amount of memory to use. Examples: 4GB, 3MiB." + echo "" + echo "Viash built in Docker:" + echo " ---setup=STRATEGY" + echo " Setup the docker container. Options are: alwaysbuild, alwayscachedbuild, ifneedbebuild, ifneedbecachedbuild, alwayspull, alwayspullelsebuild, alwayspullelsecachedbuild, ifneedbepull, ifneedbepullelsebuild, ifneedbepullelsecachedbuild, push, pushifnotpresent, donothing." + echo " Default: ifneedbepullelsecachedbuild" + echo " ---dockerfile" + echo " Print the dockerfile to stdout." + echo " ---docker_run_args=ARG" + echo " Provide runtime arguments to Docker. See the documentation on \`docker run\` for more information." + echo " ---docker_image_id" + echo " Print the docker image id to stdout." + echo " ---debug" + echo " Enter the docker container for debugging purposes." + echo "" + echo "Viash built in Engines:" + echo " ---engine=ENGINE_ID" + echo " Specify the engine to use. Options are: docker, native." + echo " Default: docker" +} + # initialise array VIASH_POSITIONAL_ARGS='' diff --git a/target/executable/untar/.config.vsh.yaml b/target/executable/untar/.config.vsh.yaml index 40493c7..16ee9b8 100644 --- a/target/executable/untar/.config.vsh.yaml +++ b/target/executable/untar/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "untar" version: "main" +authors: +- name: "Dries Schaumont" + roles: + - "author" + - "maintainer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" +- name: "Robrecht Cannoodt" + roles: + - "reviewer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" argument_groups: - name: "Input arguments" arguments: @@ -44,6 +75,7 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true +summary: "Unpack a .tar file" description: "Unpack a .tar file. When the contents of the .tar file is just a single\ \ directory,\nput the contents of the directory into the output folder instead of\ \ that directory.\n" @@ -53,6 +85,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -146,16 +181,16 @@ build_info: engine: "docker|native" output: "target/executable/untar" executable: "target/executable/untar/untar" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/executable/untar/untar b/target/executable/untar/untar index c5f2684..1cdef17 100755 --- a/target/executable/untar/untar +++ b/target/executable/untar/untar @@ -2,7 +2,7 @@ # untar main # -# This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +# This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative # work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data # Intuitive. # @@ -10,6 +10,10 @@ # authors of this component should specify the license in the header of such # files, or include a separate license file detailing the licenses of all included # files. +# +# Component authors: +# * Dries Schaumont (author, maintainer) +# * Robrecht Cannoodt (reviewer) set -e @@ -169,32 +173,6 @@ VIASH_META_CONFIG="$VIASH_META_RESOURCES_DIR/.config.vsh.yaml" VIASH_META_TEMP_DIR="$VIASH_TEMP" -# ViashHelp: Display helpful explanation about this executable -function ViashHelp { - echo "untar main" - echo "" - echo "Unpack a .tar file. When the contents of the .tar file is just a single" - echo "directory," - echo "put the contents of the directory into the output folder instead of that" - echo "directory." - echo "" - echo "Input arguments:" - echo " --input" - echo " type: file, required parameter, file must exist" - echo " Tarball file to be unpacked." - echo "" - echo "Output arguments:" - echo " --output" - echo " type: file, required parameter, output, file must exist" - echo " Directory to write the contents of the .tar file to." - echo "" - echo "Other arguments:" - echo " -e, --exclude" - echo " type: string" - echo " example: docs/figures" - echo " Prevents any file or member whose name matches the shell wildcard" - echo " (pattern) from being extracted." -} # initialise variables VIASH_MODE='run' @@ -475,10 +453,11 @@ RUN apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get install -y procps && \ rm -rf /var/lib/apt/lists/* +LABEL org.opencontainers.image.authors="Dries Schaumont, Robrecht Cannoodt" LABEL org.opencontainers.image.description="Companion container for running component untar" -LABEL org.opencontainers.image.created="2025-04-08T07:25:22Z" +LABEL org.opencontainers.image.created="2025-04-08T07:49:56Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/craftbox" -LABEL org.opencontainers.image.revision="3c8413009764e3a6839e3e8b038857caf7047593" +LABEL org.opencontainers.image.revision="8f9353f15e4d6952eca57e896f962a60b42b0a3c" LABEL org.opencontainers.image.version="main" VIASHDOCKER @@ -593,6 +572,58 @@ fi # initialise docker variables VIASH_DOCKER_RUN_ARGS=(-i --rm) + +# ViashHelp: Display helpful explanation about this executable +function ViashHelp { + echo "untar main" + echo "" + echo "Unpack a .tar file. When the contents of the .tar file is just a single" + echo "directory," + echo "put the contents of the directory into the output folder instead of that" + echo "directory." + echo "" + echo "Input arguments:" + echo " --input" + echo " type: file, required parameter, file must exist" + echo " Tarball file to be unpacked." + echo "" + echo "Output arguments:" + echo " --output" + echo " type: file, required parameter, output, file must exist" + echo " Directory to write the contents of the .tar file to." + echo "" + echo "Other arguments:" + echo " -e, --exclude" + echo " type: string" + echo " example: docs/figures" + echo " Prevents any file or member whose name matches the shell wildcard" + echo " (pattern) from being extracted." + echo "" + echo "Viash built in Computational Requirements:" + echo " ---cpus=INT" + echo " Number of CPUs to use" + echo " ---memory=STRING" + echo " Amount of memory to use. Examples: 4GB, 3MiB." + echo "" + echo "Viash built in Docker:" + echo " ---setup=STRATEGY" + echo " Setup the docker container. Options are: alwaysbuild, alwayscachedbuild, ifneedbebuild, ifneedbecachedbuild, alwayspull, alwayspullelsebuild, alwayspullelsecachedbuild, ifneedbepull, ifneedbepullelsebuild, ifneedbepullelsecachedbuild, push, pushifnotpresent, donothing." + echo " Default: ifneedbepullelsecachedbuild" + echo " ---dockerfile" + echo " Print the dockerfile to stdout." + echo " ---docker_run_args=ARG" + echo " Provide runtime arguments to Docker. See the documentation on \`docker run\` for more information." + echo " ---docker_image_id" + echo " Print the docker image id to stdout." + echo " ---debug" + echo " Enter the docker container for debugging purposes." + echo "" + echo "Viash built in Engines:" + echo " ---engine=ENGINE_ID" + echo " Specify the engine to use. Options are: docker, native." + echo " Default: docker" +} + # initialise array VIASH_POSITIONAL_ARGS='' diff --git a/target/nextflow/concat_text/.config.vsh.yaml b/target/nextflow/concat_text/.config.vsh.yaml index 1bf5f60..c9bd25a 100644 --- a/target/nextflow/concat_text/.config.vsh.yaml +++ b/target/nextflow/concat_text/.config.vsh.yaml @@ -64,6 +64,7 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true +summary: "Concatenate a number of text files" description: "Concatenate a number of text files, handle gzipped text files gracefully\ \ and\noptionally gzip the output text file.\n\nThis component is useful for concatening\ \ fastq files from different lanes, for instance.\n" @@ -76,6 +77,9 @@ info: \ of zipped and plain input files\n 2. Allow to specify a compression algorithm\ \ for the output\n" status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -170,16 +174,16 @@ build_info: engine: "docker|native" output: "target/nextflow/concat_text" executable: "target/nextflow/concat_text/main.nf" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/nextflow/concat_text/main.nf b/target/nextflow/concat_text/main.nf index e3319d6..ec2426f 100644 --- a/target/nextflow/concat_text/main.nf +++ b/target/nextflow/concat_text/main.nf @@ -1,6 +1,6 @@ // concat_text main // -// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +// This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative // work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data // Intuitive. // @@ -177,7 +177,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi Map _processInputValues(Map inputs, Map config, String id, String key) { if (!workflow.stubRun) { config.allArguments.each { arg -> - if (arg.required) { + if (arg.required && arg.direction == "input") { assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null : "Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing" } @@ -196,15 +196,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) { } // helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf' -Map _processOutputValues(Map outputs, Map config, String id, String key) { +Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) { if (!workflow.stubRun) { - config.allArguments.each { arg -> - if (arg.direction == "output" && arg.required) { - assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : - "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" - } - } - outputs = outputs.collectEntries { name, value -> def par = config.allArguments.find { it.plainName == name && it.direction == "output" } assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument" @@ -217,6 +210,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) { return outputs } +void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) { + if (!workflow.stubRun) { + config.allArguments.each { arg -> + if (arg.direction == "output" && arg.required) { + assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : + "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" + } + } + } +} // helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf' class IDChecker { final def items = [] as Set @@ -1670,6 +1673,162 @@ def joinStates(Closure apply_) { } return joinStatesWf } +// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf' +def publishFiles(Map args) { + def key_ = args.get("key") + + assert key_ != null : "publishFiles: key must be specified" + + workflow publishFilesWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] + + // the input files and the target output filenames + def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() + def inputFiles_ = inputoutputFilenames_[0] + def outputFilenames_ = inputoutputFilenames_[1] + + [id_, inputFiles_, outputFilenames_] + } + | publishFilesProc + emit: input_ch + } + return publishFilesWf +} + +process publishFilesProc { + // todo: check publishpath? + publishDir path: "${getPublishDir()}/", mode: "copy" + tag "$id" + input: + tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + output: + tuple val(id), path{outputFiles} + script: + def copyCommands = [ + inputFiles instanceof List ? inputFiles : [inputFiles], + outputFiles instanceof List ? outputFiles : [outputFiles] + ] + .transpose() + .collectMany{infile, outfile -> + if (infile.toString() != outfile.toString()) { + [ + "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", + "cp -r '${infile.toString()}' '${outfile.toString()}'" + ] + } else { + // no need to copy if infile is the same as outfile + [] + } + } + """ + echo "Copying output files to destination folder" + ${copyCommands.join("\n ")} + """ +} + + +// this assumes that the state contains no other values other than those specified in the config +def publishFilesByConfig(Map args) { + def config = args.get("config") + assert config != null : "publishFilesByConfig: config must be specified" + + def key_ = args.get("key", config.name) + assert key_ != null : "publishFilesByConfig: key must be specified" + + workflow publishFilesSimpleWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] + def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] + + + // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // - key is a String + // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) + // - inputPath is a List[Path] + // - outputFilename is a List[String] + // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) + def processedState = + config.allArguments + .findAll { it.direction == "output" } + .collectMany { par -> + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the parameter is not a file, it should be stored + // in the state as-is, but is not something that needs + // to be copied from the source path to the dest path + if (par.type != "file") { + return [[inputPath: [], outputFilename: []]] + } + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] + } + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } else { + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + } + } + + def inputPaths = processedState.collectMany{it.inputPath} + def outputFilenames = processedState.collectMany{it.outputFilename} + + + [id_, inputPaths, outputFilenames] + } + | publishFilesProc + emit: input_ch + } + return publishFilesSimpleWf +} + + + + // helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf' def collectFiles(obj) { if (obj instanceof java.io.File || obj instanceof Path) { @@ -1727,8 +1886,6 @@ def publishStates(Map args) { // the input files and the target output filenames def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] def yamlFilename = yamlTemplate_ .replaceAll('\\$id', id_) @@ -1741,7 +1898,7 @@ def publishStates(Map args) { // convert state to yaml blob def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename)) - [id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -1753,33 +1910,17 @@ process publishStatesProc { publishDir path: "${getPublishDir()}/", mode: "copy" tag "$id" input: - tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + tuple val(id), val(yamlBlob), val(yamlFile) output: - tuple val(id), path{[yamlFile] + outputFiles} + tuple val(id), path{[yamlFile]} script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } - } """ -mkdir -p "\$(dirname '${yamlFile}')" -echo "Storing state as yaml" -echo '${yamlBlob}' > '${yamlFile}' -echo "Copying output files to destination folder" -${copyCommands.join("\n ")} -""" + mkdir -p "\$(dirname '${yamlFile}')" + echo "Storing state as yaml" + cat > '${yamlFile}' << HERE +${yamlBlob} +HERE + """ } @@ -1810,13 +1951,10 @@ def publishStatesByConfig(Map args) { .replaceAll('\\$\\{key\\}', key_) def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent() - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // the processed state is a list of [key, value] tuples, where // - key is a String // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) - // - inputPath is a List[Path] - // - outputFilename is a List[String] // - (key, value) are the tuples that will be saved to the state.yaml file - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) def processedState = config.allArguments .findAll { it.direction == "output" } @@ -1833,7 +1971,7 @@ def publishStatesByConfig(Map args) { // in the state as-is, but is not something that needs // to be copied from the source path to the dest path if (par.type != "file") { - return [[key: plainName_, value: value, inputPath: [], outputFilename: []]] + return [[key: plainName_, value: value]] } // if the orig state does not contain this filename, // it's an optional argument for which the user specified @@ -1864,13 +2002,9 @@ def publishStatesByConfig(Map args) { if (yamlDir != null) { value_ = yamlDir.relativize(value_) } - def inputPath = val instanceof File ? val.toPath() : val - [value: value_, inputPath: inputPath, outputFilename: filename_ix] + return value_ } - def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] - } - return [[key: plainName_] + transposedOutputs] + return [["key": plainName_, "value": outputPerFile]] } else { def value_ = java.nio.file.Paths.get(filename) // if id contains a slash @@ -1878,18 +2012,17 @@ def publishStatesByConfig(Map args) { value_ = yamlDir.relativize(value_) } def inputPath = value instanceof File ? value.toPath() : value - return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]] + return [["key": plainName_, value: value_]] } } + def updatedState_ = processedState.collectEntries{[it.key, it.value]} - def inputPaths = processedState.collectMany{it.inputPath} - def outputFilenames = processedState.collectMany{it.outputFilename} // convert state to yaml blob def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_) - [id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -2563,7 +2696,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -2720,12 +2854,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } // TODO: move some of the _meta.join_id wrangling to the safeJoin() function. - def chInitialOutput = chArgsWithDefaults + def chInitialOutputMulti = chArgsWithDefaults | _debug(workflowArgs, "processed") // run workflow | innerWorkflowFactory(workflowArgs) - // check output tuple - | map { id_, output_ -> + def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti] + assert chInitialOutputList.size() > 0: "should have emitted at least one output channel" + // Add a channel ID to the events, which designates the channel the event was emitted from as a running number + // This number is used to sort the events later when the events are gathered from across the channels. + def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex -> + def newChannel = channel + | map {tuple -> + assert tuple instanceof List : + "Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" + + " Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" + + " Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}" + + def newEvent = [channelIndex] + tuple + return newEvent + } + return newChannel + } + // Put the events into 1 channel, cover case where there is only one channel is emitted + def chInitialOutput = chInitialOutputList.size() > 1 ? \ + chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \ + chInitialOutputListWithIndexedEvents[0] + def chInitialOutputProcessed = chInitialOutput + | map { tuple -> + def channelId = tuple[0] + def id_ = tuple[1] + def output_ = tuple[2] // see if output map contains metadata def meta_ = @@ -2738,19 +2896,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { output_ = output_.findAll{k, v -> k != "_meta"} // check value types - output_ = _processOutputValues(output_, meta.config, id_, key_) + output_ = _checkValidOutputArgument(output_, meta.config, id_, key_) - // simplify output if need be - if (workflowArgs.auto.simplifyOutput && output_.size() == 1) { - output_ = output_.values()[0] - } - - [join_id, id_, output_] + [join_id, channelId, id_, output_] } // | view{"chInitialOutput: ${it.take(3)}"} + // join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...] + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) + // input tuple format: [join_id, channel_id, id, output, prev_state, ...] + // output tuple format: [join_id, channel_id, id, new_state, ...] + | map{ tup -> + def new_state = workflowArgs.toState(tup.drop(2).take(3)) + tup.take(3) + [new_state] + tup.drop(5) + } + if (workflowArgs.auto.publish == "state") { + def chPublishFiles = chPublishWithPreviousState + // input tuple format: [join_id, channel_id, id, new_state, ...] + // output tuple format: [join_id, channel_id, id, new_state] + | map{ tup -> + tup.take(4) + } + + safeJoin(chPublishFiles, chArgsWithDefaults, key_) + // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] + // output tuple format: [id, new_state, orig_state] + | map { tup -> + tup.drop(2).take(3) + } + | publishFilesByConfig(key: key_, config: meta.config) + } + // Join the state from the events that were emitted from different channels + def chJoined = chInitialOutputProcessed + | map {tuple -> + def join_id = tuple[0] + def channel_id = tuple[1] + def id = tuple[2] + def other = tuple.drop(3) + // Below, groupTuple is used to join the events. To make sure resuming a workflow + // keeps working, the output state must be deterministic. This means the state needs to be + // sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash', + // but hashing the state when it is large can be problematic in terms of performance. + // Therefore, a custom comparator function is provided. We add the channel ID to the + // states so that we can use the channel ID to sort the items. + def stateWithChannelID = [[channel_id] * other.size(), other].transpose() + // A comparator that is provided to groupTuple's 'sort' argument is applied + // to all elements of the event tuple (that is not the 'id'). The comparator + // closure that is used below expects the input to be List. So the join_id and + // channel_id must also be wrapped in a list. + [[join_id], [channel_id], id] + stateWithChannelID + } + | groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true) + | map {join_ids, _, id, statesWithChannelID -> + // Remove the channel IDs from the states + def states = statesWithChannelID.collect{it[1]} + def newJoinId = join_ids.flatten().unique{a, b -> a <=> b} + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." + def newJoinIdUnique = newJoinId[0] + + // Merge the states from the different channels + def newState = states.inject([:]){ old_state, state_to_add -> + return old_state + state_to_add.collectEntries{k, v -> + if (!multipleArgs.contains(k)) { + // if the key is not a multiple argument, we expect only one value + if (old_state.containsKey(k)) { + assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted." + } + [k, v] + } else { + // if the key is a multiple argument, append the different values into one list + def prevValue = old_state.getOrDefault(k, []) + def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue] + [k, prevValueAsList + v] + } + } + } + + _checkAllRequiredOuputsPresent(newState, meta.config, id, key_) + + // simplify output if need be + if (workflowArgs.auto.simplifyOutput && newState.size() == 1) { + newState = newState.values()[0] + } + + return [newJoinIdUnique, id, newState] + } + // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_) + def chNewState = safeJoin(chJoined, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -2759,23 +2992,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } if (workflowArgs.auto.publish == "state") { - def chPublish = chNewState + def chPublishStates = chNewState // input tuple format: [join_id, id, new_state, ...] // output tuple format: [join_id, id, new_state] | map{ tup -> tup.take(3) } - safeJoin(chPublish, chArgsWithDefaults, key_) + safeJoin(chPublishStates, chArgsWithDefaults, key_) // input tuple format: [join_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(1).take(3) - } + } | publishStatesByConfig(key: key_, config: meta.config) } - - // remove join_id and meta chReturn = chNewState | map { tup -> // input tuple format: [join_id, id, new_state, ...] @@ -2906,6 +3137,7 @@ meta = [ "is_executable" : true } ], + "summary" : "Concatenate a number of text files", "description" : "Concatenate a number of text files, handle gzipped text files gracefully and\noptionally gzip the output text file.\n\nThis component is useful for concatening fastq files from different lanes, for instance.\n", "test_resources" : [ { @@ -2918,6 +3150,10 @@ meta = [ "improvements" : "This component could be improved in 2 ways:\n 1. Allow for a mix of zipped and plain input files\n 2. Allow to specify a compression algorithm for the output\n" }, "status" : "enabled", + "scope" : { + "image" : "public", + "target" : "public" + }, "requirements" : { "commands" : [ "ps" @@ -3030,16 +3266,16 @@ meta = [ "runner" : "nextflow", "engine" : "docker|native", "output" : "target/nextflow/concat_text", - "viash_version" : "0.9.0", - "git_commit" : "3c8413009764e3a6839e3e8b038857caf7047593", + "viash_version" : "0.9.3", + "git_commit" : "8f9353f15e4d6952eca57e896f962a60b42b0a3c", "git_remote" : "https://github.com/viash-hub/craftbox", - "git_tag" : "v0.1.0-3-g3c84130" + "git_tag" : "v0.1.0-4-g8f9353f" }, "package_config" : { "name" : "craftbox", "version" : "main", "description" : "A collection of custom-tailored scripts and applied tools.\n", - "viash_version" : "0.9.0", + "viash_version" : "0.9.3", "source" : "src", "target" : "target", "config_mods" : [ @@ -3465,7 +3701,7 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { // create process from temp file def binding = new nextflow.script.ScriptBinding([:]) def session = nextflow.Nextflow.getSession() - def parser = new nextflow.script.ScriptParser(session) + def parser = _getScriptLoader(session) .setModule(true) .setBinding(binding) def moduleScript = parser.runScript(tempFile) @@ -3479,6 +3715,27 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { return scriptMeta.getProcess(procKey) } +// use Reflection to get a ScriptParser / ScriptLoader +// <25.02.0-edge: new nextflow.script.ScriptParser(session) +// >=25.02.0-edge: nextflow.script.ScriptLoaderFactory.create(session) +def _getScriptLoader(nextflow.Session session) { + // try using the old method + try { + Class scriptParserClass = Class.forName('nextflow.script.ScriptParser') + return scriptParserClass.getDeclaredConstructor(nextflow.Session).newInstance(session) + } catch (ClassNotFoundException e) { + // else try with the new method + try { + Class scriptLoaderFactoryClass = Class.forName('nextflow.script.ScriptLoaderFactory') + def createMethod = scriptLoaderFactoryClass.getDeclaredMethod('create', nextflow.Session) + return createMethod.invoke(null, session) // null because create is static + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | java.lang.reflect.InvocationTargetException e2) { + // Handle the case where neither class is found + throw new Exception("Neither nextflow.script.ScriptParser nor nextflow.script.ScriptLoaderFactory could be found. Is this a compatible Nextflow version?", e2) + } + } +} + // defaults meta["defaults"] = [ // key to be used to trace the process and determine output names diff --git a/target/nextflow/csv2fasta/.config.vsh.yaml b/target/nextflow/csv2fasta/.config.vsh.yaml index 8c5d1a2..e5499f1 100644 --- a/target/nextflow/csv2fasta/.config.vsh.yaml +++ b/target/nextflow/csv2fasta/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "csv2fasta" version: "main" +authors: +- name: "Dries Schaumont" + roles: + - "author" + - "maintainer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" +- name: "Robrecht Cannoodt" + roles: + - "reviewer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" argument_groups: - name: "Inputs" arguments: @@ -104,6 +135,7 @@ resources: - type: "python_script" path: "script.py" is_executable: true +summary: "Convert a CSV file to FASTA entries" description: "Convert two columns from a CSV file to FASTA entries. The CSV file can\n\ contain an optional header and each row (other than the header) becomes\na single\ \ FASTA record. One of the two columns will be used as the names\nfor the FASTA\ @@ -116,6 +148,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -221,16 +256,16 @@ build_info: engine: "docker|native" output: "target/nextflow/csv2fasta" executable: "target/nextflow/csv2fasta/main.nf" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/nextflow/csv2fasta/main.nf b/target/nextflow/csv2fasta/main.nf index f77f8c1..f4c5c8a 100644 --- a/target/nextflow/csv2fasta/main.nf +++ b/target/nextflow/csv2fasta/main.nf @@ -1,6 +1,6 @@ // csv2fasta main // -// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +// This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative // work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data // Intuitive. // @@ -8,6 +8,10 @@ // authors of this component should specify the license in the header of such // files, or include a separate license file detailing the licenses of all included // files. +// +// Component authors: +// * Dries Schaumont (author, maintainer) +// * Robrecht Cannoodt (reviewer) //////////////////////////// // VDSL3 helper functions // @@ -173,7 +177,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi Map _processInputValues(Map inputs, Map config, String id, String key) { if (!workflow.stubRun) { config.allArguments.each { arg -> - if (arg.required) { + if (arg.required && arg.direction == "input") { assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null : "Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing" } @@ -192,15 +196,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) { } // helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf' -Map _processOutputValues(Map outputs, Map config, String id, String key) { +Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) { if (!workflow.stubRun) { - config.allArguments.each { arg -> - if (arg.direction == "output" && arg.required) { - assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : - "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" - } - } - outputs = outputs.collectEntries { name, value -> def par = config.allArguments.find { it.plainName == name && it.direction == "output" } assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument" @@ -213,6 +210,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) { return outputs } +void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) { + if (!workflow.stubRun) { + config.allArguments.each { arg -> + if (arg.direction == "output" && arg.required) { + assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : + "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" + } + } + } +} // helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf' class IDChecker { final def items = [] as Set @@ -1666,6 +1673,162 @@ def joinStates(Closure apply_) { } return joinStatesWf } +// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf' +def publishFiles(Map args) { + def key_ = args.get("key") + + assert key_ != null : "publishFiles: key must be specified" + + workflow publishFilesWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] + + // the input files and the target output filenames + def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() + def inputFiles_ = inputoutputFilenames_[0] + def outputFilenames_ = inputoutputFilenames_[1] + + [id_, inputFiles_, outputFilenames_] + } + | publishFilesProc + emit: input_ch + } + return publishFilesWf +} + +process publishFilesProc { + // todo: check publishpath? + publishDir path: "${getPublishDir()}/", mode: "copy" + tag "$id" + input: + tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + output: + tuple val(id), path{outputFiles} + script: + def copyCommands = [ + inputFiles instanceof List ? inputFiles : [inputFiles], + outputFiles instanceof List ? outputFiles : [outputFiles] + ] + .transpose() + .collectMany{infile, outfile -> + if (infile.toString() != outfile.toString()) { + [ + "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", + "cp -r '${infile.toString()}' '${outfile.toString()}'" + ] + } else { + // no need to copy if infile is the same as outfile + [] + } + } + """ + echo "Copying output files to destination folder" + ${copyCommands.join("\n ")} + """ +} + + +// this assumes that the state contains no other values other than those specified in the config +def publishFilesByConfig(Map args) { + def config = args.get("config") + assert config != null : "publishFilesByConfig: config must be specified" + + def key_ = args.get("key", config.name) + assert key_ != null : "publishFilesByConfig: key must be specified" + + workflow publishFilesSimpleWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] + def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] + + + // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // - key is a String + // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) + // - inputPath is a List[Path] + // - outputFilename is a List[String] + // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) + def processedState = + config.allArguments + .findAll { it.direction == "output" } + .collectMany { par -> + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the parameter is not a file, it should be stored + // in the state as-is, but is not something that needs + // to be copied from the source path to the dest path + if (par.type != "file") { + return [[inputPath: [], outputFilename: []]] + } + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] + } + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } else { + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + } + } + + def inputPaths = processedState.collectMany{it.inputPath} + def outputFilenames = processedState.collectMany{it.outputFilename} + + + [id_, inputPaths, outputFilenames] + } + | publishFilesProc + emit: input_ch + } + return publishFilesSimpleWf +} + + + + // helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf' def collectFiles(obj) { if (obj instanceof java.io.File || obj instanceof Path) { @@ -1723,8 +1886,6 @@ def publishStates(Map args) { // the input files and the target output filenames def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] def yamlFilename = yamlTemplate_ .replaceAll('\\$id', id_) @@ -1737,7 +1898,7 @@ def publishStates(Map args) { // convert state to yaml blob def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename)) - [id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -1749,33 +1910,17 @@ process publishStatesProc { publishDir path: "${getPublishDir()}/", mode: "copy" tag "$id" input: - tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + tuple val(id), val(yamlBlob), val(yamlFile) output: - tuple val(id), path{[yamlFile] + outputFiles} + tuple val(id), path{[yamlFile]} script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } - } """ -mkdir -p "\$(dirname '${yamlFile}')" -echo "Storing state as yaml" -echo '${yamlBlob}' > '${yamlFile}' -echo "Copying output files to destination folder" -${copyCommands.join("\n ")} -""" + mkdir -p "\$(dirname '${yamlFile}')" + echo "Storing state as yaml" + cat > '${yamlFile}' << HERE +${yamlBlob} +HERE + """ } @@ -1806,13 +1951,10 @@ def publishStatesByConfig(Map args) { .replaceAll('\\$\\{key\\}', key_) def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent() - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // the processed state is a list of [key, value] tuples, where // - key is a String // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) - // - inputPath is a List[Path] - // - outputFilename is a List[String] // - (key, value) are the tuples that will be saved to the state.yaml file - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) def processedState = config.allArguments .findAll { it.direction == "output" } @@ -1829,7 +1971,7 @@ def publishStatesByConfig(Map args) { // in the state as-is, but is not something that needs // to be copied from the source path to the dest path if (par.type != "file") { - return [[key: plainName_, value: value, inputPath: [], outputFilename: []]] + return [[key: plainName_, value: value]] } // if the orig state does not contain this filename, // it's an optional argument for which the user specified @@ -1860,13 +2002,9 @@ def publishStatesByConfig(Map args) { if (yamlDir != null) { value_ = yamlDir.relativize(value_) } - def inputPath = val instanceof File ? val.toPath() : val - [value: value_, inputPath: inputPath, outputFilename: filename_ix] + return value_ } - def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] - } - return [[key: plainName_] + transposedOutputs] + return [["key": plainName_, "value": outputPerFile]] } else { def value_ = java.nio.file.Paths.get(filename) // if id contains a slash @@ -1874,18 +2012,17 @@ def publishStatesByConfig(Map args) { value_ = yamlDir.relativize(value_) } def inputPath = value instanceof File ? value.toPath() : value - return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]] + return [["key": plainName_, value: value_]] } } + def updatedState_ = processedState.collectEntries{[it.key, it.value]} - def inputPaths = processedState.collectMany{it.inputPath} - def outputFilenames = processedState.collectMany{it.outputFilename} // convert state to yaml blob def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_) - [id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -2559,7 +2696,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -2716,12 +2854,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } // TODO: move some of the _meta.join_id wrangling to the safeJoin() function. - def chInitialOutput = chArgsWithDefaults + def chInitialOutputMulti = chArgsWithDefaults | _debug(workflowArgs, "processed") // run workflow | innerWorkflowFactory(workflowArgs) - // check output tuple - | map { id_, output_ -> + def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti] + assert chInitialOutputList.size() > 0: "should have emitted at least one output channel" + // Add a channel ID to the events, which designates the channel the event was emitted from as a running number + // This number is used to sort the events later when the events are gathered from across the channels. + def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex -> + def newChannel = channel + | map {tuple -> + assert tuple instanceof List : + "Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" + + " Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" + + " Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}" + + def newEvent = [channelIndex] + tuple + return newEvent + } + return newChannel + } + // Put the events into 1 channel, cover case where there is only one channel is emitted + def chInitialOutput = chInitialOutputList.size() > 1 ? \ + chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \ + chInitialOutputListWithIndexedEvents[0] + def chInitialOutputProcessed = chInitialOutput + | map { tuple -> + def channelId = tuple[0] + def id_ = tuple[1] + def output_ = tuple[2] // see if output map contains metadata def meta_ = @@ -2734,19 +2896,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { output_ = output_.findAll{k, v -> k != "_meta"} // check value types - output_ = _processOutputValues(output_, meta.config, id_, key_) + output_ = _checkValidOutputArgument(output_, meta.config, id_, key_) - // simplify output if need be - if (workflowArgs.auto.simplifyOutput && output_.size() == 1) { - output_ = output_.values()[0] - } - - [join_id, id_, output_] + [join_id, channelId, id_, output_] } // | view{"chInitialOutput: ${it.take(3)}"} + // join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...] + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) + // input tuple format: [join_id, channel_id, id, output, prev_state, ...] + // output tuple format: [join_id, channel_id, id, new_state, ...] + | map{ tup -> + def new_state = workflowArgs.toState(tup.drop(2).take(3)) + tup.take(3) + [new_state] + tup.drop(5) + } + if (workflowArgs.auto.publish == "state") { + def chPublishFiles = chPublishWithPreviousState + // input tuple format: [join_id, channel_id, id, new_state, ...] + // output tuple format: [join_id, channel_id, id, new_state] + | map{ tup -> + tup.take(4) + } + + safeJoin(chPublishFiles, chArgsWithDefaults, key_) + // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] + // output tuple format: [id, new_state, orig_state] + | map { tup -> + tup.drop(2).take(3) + } + | publishFilesByConfig(key: key_, config: meta.config) + } + // Join the state from the events that were emitted from different channels + def chJoined = chInitialOutputProcessed + | map {tuple -> + def join_id = tuple[0] + def channel_id = tuple[1] + def id = tuple[2] + def other = tuple.drop(3) + // Below, groupTuple is used to join the events. To make sure resuming a workflow + // keeps working, the output state must be deterministic. This means the state needs to be + // sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash', + // but hashing the state when it is large can be problematic in terms of performance. + // Therefore, a custom comparator function is provided. We add the channel ID to the + // states so that we can use the channel ID to sort the items. + def stateWithChannelID = [[channel_id] * other.size(), other].transpose() + // A comparator that is provided to groupTuple's 'sort' argument is applied + // to all elements of the event tuple (that is not the 'id'). The comparator + // closure that is used below expects the input to be List. So the join_id and + // channel_id must also be wrapped in a list. + [[join_id], [channel_id], id] + stateWithChannelID + } + | groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true) + | map {join_ids, _, id, statesWithChannelID -> + // Remove the channel IDs from the states + def states = statesWithChannelID.collect{it[1]} + def newJoinId = join_ids.flatten().unique{a, b -> a <=> b} + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." + def newJoinIdUnique = newJoinId[0] + + // Merge the states from the different channels + def newState = states.inject([:]){ old_state, state_to_add -> + return old_state + state_to_add.collectEntries{k, v -> + if (!multipleArgs.contains(k)) { + // if the key is not a multiple argument, we expect only one value + if (old_state.containsKey(k)) { + assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted." + } + [k, v] + } else { + // if the key is a multiple argument, append the different values into one list + def prevValue = old_state.getOrDefault(k, []) + def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue] + [k, prevValueAsList + v] + } + } + } + + _checkAllRequiredOuputsPresent(newState, meta.config, id, key_) + + // simplify output if need be + if (workflowArgs.auto.simplifyOutput && newState.size() == 1) { + newState = newState.values()[0] + } + + return [newJoinIdUnique, id, newState] + } + // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_) + def chNewState = safeJoin(chJoined, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -2755,23 +2992,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } if (workflowArgs.auto.publish == "state") { - def chPublish = chNewState + def chPublishStates = chNewState // input tuple format: [join_id, id, new_state, ...] // output tuple format: [join_id, id, new_state] | map{ tup -> tup.take(3) } - safeJoin(chPublish, chArgsWithDefaults, key_) + safeJoin(chPublishStates, chArgsWithDefaults, key_) // input tuple format: [join_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(1).take(3) - } + } | publishStatesByConfig(key: key_, config: meta.config) } - - // remove join_id and meta chReturn = chNewState | map { tup -> // input tuple format: [join_id, id, new_state, ...] @@ -2806,6 +3041,56 @@ meta = [ "config": processConfig(readJsonBlob('''{ "name" : "csv2fasta", "version" : "main", + "authors" : [ + { + "name" : "Dries Schaumont", + "roles" : [ + "author", + "maintainer" + ], + "info" : { + "links" : { + "email" : "dries@data-intuitive.com", + "github" : "DriesSchaumont", + "orcid" : "0000-0002-4389-0440", + "linkedin" : "dries-schaumont" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Scientist" + } + ] + } + }, + { + "name" : "Robrecht Cannoodt", + "roles" : [ + "reviewer" + ], + "info" : { + "links" : { + "email" : "robrecht@data-intuitive.com", + "github" : "rcannood", + "orcid" : "0000-0003-3641-729X", + "linkedin" : "robrechtcannoodt" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Science Engineer" + }, + { + "name" : "Open Problems", + "href" : "https://openproblems.bio", + "role" : "Core Member" + } + ] + } + } + ], "argument_groups" : [ { "name" : "Inputs", @@ -2927,6 +3212,7 @@ meta = [ "is_executable" : true } ], + "summary" : "Convert a CSV file to FASTA entries", "description" : "Convert two columns from a CSV file to FASTA entries. The CSV file can\ncontain an optional header and each row (other than the header) becomes\na single FASTA record. One of the two columns will be used as the names\nfor the FASTA entries, while the other become the sequences. The sequences\ncolumn must only contain characters that are valid IUPAC notation for \nnucleotides or a group thereof (wildcard characters).\n", "test_resources" : [ { @@ -2936,6 +3222,10 @@ meta = [ } ], "status" : "enabled", + "scope" : { + "image" : "public", + "target" : "public" + }, "requirements" : { "commands" : [ "ps" @@ -3066,16 +3356,16 @@ meta = [ "runner" : "nextflow", "engine" : "docker|native", "output" : "target/nextflow/csv2fasta", - "viash_version" : "0.9.0", - "git_commit" : "3c8413009764e3a6839e3e8b038857caf7047593", + "viash_version" : "0.9.3", + "git_commit" : "8f9353f15e4d6952eca57e896f962a60b42b0a3c", "git_remote" : "https://github.com/viash-hub/craftbox", - "git_tag" : "v0.1.0-3-g3c84130" + "git_tag" : "v0.1.0-4-g8f9353f" }, "package_config" : { "name" : "craftbox", "version" : "main", "description" : "A collection of custom-tailored scripts and applied tools.\n", - "viash_version" : "0.9.0", + "viash_version" : "0.9.3", "source" : "src", "target" : "target", "config_mods" : [ @@ -3106,7 +3396,7 @@ meta = [ // inner workflow hook def innerWorkflowFactory(args) { def rawScript = '''set -e -tempscript=".viash_script.sh" +tempscript=".viash_script.py" cat > "$tempscript" << VIASHMAIN from pathlib import Path import dnaio @@ -3577,7 +3867,7 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { // create process from temp file def binding = new nextflow.script.ScriptBinding([:]) def session = nextflow.Nextflow.getSession() - def parser = new nextflow.script.ScriptParser(session) + def parser = _getScriptLoader(session) .setModule(true) .setBinding(binding) def moduleScript = parser.runScript(tempFile) @@ -3591,6 +3881,27 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { return scriptMeta.getProcess(procKey) } +// use Reflection to get a ScriptParser / ScriptLoader +// <25.02.0-edge: new nextflow.script.ScriptParser(session) +// >=25.02.0-edge: nextflow.script.ScriptLoaderFactory.create(session) +def _getScriptLoader(nextflow.Session session) { + // try using the old method + try { + Class scriptParserClass = Class.forName('nextflow.script.ScriptParser') + return scriptParserClass.getDeclaredConstructor(nextflow.Session).newInstance(session) + } catch (ClassNotFoundException e) { + // else try with the new method + try { + Class scriptLoaderFactoryClass = Class.forName('nextflow.script.ScriptLoaderFactory') + def createMethod = scriptLoaderFactoryClass.getDeclaredMethod('create', nextflow.Session) + return createMethod.invoke(null, session) // null because create is static + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | java.lang.reflect.InvocationTargetException e2) { + // Handle the case where neither class is found + throw new Exception("Neither nextflow.script.ScriptParser nor nextflow.script.ScriptLoaderFactory could be found. Is this a compatible Nextflow version?", e2) + } + } +} + // defaults meta["defaults"] = [ // key to be used to trace the process and determine output names diff --git a/target/nextflow/csv2fasta/nextflow.config b/target/nextflow/csv2fasta/nextflow.config index cda7cfa..bb5e859 100644 --- a/target/nextflow/csv2fasta/nextflow.config +++ b/target/nextflow/csv2fasta/nextflow.config @@ -4,6 +4,7 @@ manifest { nextflowVersion = '!>=20.12.1-edge' version = 'main' description = 'Convert two columns from a CSV file to FASTA entries. The CSV file can\ncontain an optional header and each row (other than the header) becomes\na single FASTA record. One of the two columns will be used as the names\nfor the FASTA entries, while the other become the sequences. The sequences\ncolumn must only contain characters that are valid IUPAC notation for \nnucleotides or a group thereof (wildcard characters).\n' + author = 'Dries Schaumont, Robrecht Cannoodt' } process.container = 'nextflow/bash:latest' diff --git a/target/nextflow/sync_resources/.config.vsh.yaml b/target/nextflow/sync_resources/.config.vsh.yaml index d71d0b3..b27e2f4 100644 --- a/target/nextflow/sync_resources/.config.vsh.yaml +++ b/target/nextflow/sync_resources/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "sync_resources" version: "main" +authors: +- name: "Robrecht Cannoodt" + roles: + - "author" + - "maintainer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" +- name: "Dries Schaumont" + roles: + - "reviewer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" argument_groups: - name: "Inputs" arguments: @@ -53,7 +84,10 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true -description: "Sync a Viash package's test resources to the local filesystem" +summary: "Sync a Viash package's test resources to the local filesystem" +description: "Sync a Viash package's test resources to the local filesystem based\ + \ on the\nthe `.info.test_resources` field in the `_viash.yaml` file. This is useful\ + \ for\ntesting and debugging purposes.\n" usage: "sync_resources\nsync_resources --input _viash.yaml --output .\n" test_resources: - type: "bash_script" @@ -61,6 +95,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -159,16 +196,16 @@ build_info: engine: "docker|native" output: "target/nextflow/sync_resources" executable: "target/nextflow/sync_resources/main.nf" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/nextflow/sync_resources/main.nf b/target/nextflow/sync_resources/main.nf index 95e0db9..8b1c700 100644 --- a/target/nextflow/sync_resources/main.nf +++ b/target/nextflow/sync_resources/main.nf @@ -1,6 +1,6 @@ // sync_resources main // -// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +// This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative // work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data // Intuitive. // @@ -8,6 +8,10 @@ // authors of this component should specify the license in the header of such // files, or include a separate license file detailing the licenses of all included // files. +// +// Component authors: +// * Robrecht Cannoodt (author, maintainer) +// * Dries Schaumont (reviewer) //////////////////////////// // VDSL3 helper functions // @@ -173,7 +177,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi Map _processInputValues(Map inputs, Map config, String id, String key) { if (!workflow.stubRun) { config.allArguments.each { arg -> - if (arg.required) { + if (arg.required && arg.direction == "input") { assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null : "Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing" } @@ -192,15 +196,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) { } // helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf' -Map _processOutputValues(Map outputs, Map config, String id, String key) { +Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) { if (!workflow.stubRun) { - config.allArguments.each { arg -> - if (arg.direction == "output" && arg.required) { - assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : - "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" - } - } - outputs = outputs.collectEntries { name, value -> def par = config.allArguments.find { it.plainName == name && it.direction == "output" } assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument" @@ -213,6 +210,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) { return outputs } +void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) { + if (!workflow.stubRun) { + config.allArguments.each { arg -> + if (arg.direction == "output" && arg.required) { + assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : + "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" + } + } + } +} // helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf' class IDChecker { final def items = [] as Set @@ -1666,6 +1673,162 @@ def joinStates(Closure apply_) { } return joinStatesWf } +// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf' +def publishFiles(Map args) { + def key_ = args.get("key") + + assert key_ != null : "publishFiles: key must be specified" + + workflow publishFilesWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] + + // the input files and the target output filenames + def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() + def inputFiles_ = inputoutputFilenames_[0] + def outputFilenames_ = inputoutputFilenames_[1] + + [id_, inputFiles_, outputFilenames_] + } + | publishFilesProc + emit: input_ch + } + return publishFilesWf +} + +process publishFilesProc { + // todo: check publishpath? + publishDir path: "${getPublishDir()}/", mode: "copy" + tag "$id" + input: + tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + output: + tuple val(id), path{outputFiles} + script: + def copyCommands = [ + inputFiles instanceof List ? inputFiles : [inputFiles], + outputFiles instanceof List ? outputFiles : [outputFiles] + ] + .transpose() + .collectMany{infile, outfile -> + if (infile.toString() != outfile.toString()) { + [ + "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", + "cp -r '${infile.toString()}' '${outfile.toString()}'" + ] + } else { + // no need to copy if infile is the same as outfile + [] + } + } + """ + echo "Copying output files to destination folder" + ${copyCommands.join("\n ")} + """ +} + + +// this assumes that the state contains no other values other than those specified in the config +def publishFilesByConfig(Map args) { + def config = args.get("config") + assert config != null : "publishFilesByConfig: config must be specified" + + def key_ = args.get("key", config.name) + assert key_ != null : "publishFilesByConfig: key must be specified" + + workflow publishFilesSimpleWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] + def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] + + + // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // - key is a String + // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) + // - inputPath is a List[Path] + // - outputFilename is a List[String] + // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) + def processedState = + config.allArguments + .findAll { it.direction == "output" } + .collectMany { par -> + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the parameter is not a file, it should be stored + // in the state as-is, but is not something that needs + // to be copied from the source path to the dest path + if (par.type != "file") { + return [[inputPath: [], outputFilename: []]] + } + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] + } + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } else { + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + } + } + + def inputPaths = processedState.collectMany{it.inputPath} + def outputFilenames = processedState.collectMany{it.outputFilename} + + + [id_, inputPaths, outputFilenames] + } + | publishFilesProc + emit: input_ch + } + return publishFilesSimpleWf +} + + + + // helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf' def collectFiles(obj) { if (obj instanceof java.io.File || obj instanceof Path) { @@ -1723,8 +1886,6 @@ def publishStates(Map args) { // the input files and the target output filenames def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] def yamlFilename = yamlTemplate_ .replaceAll('\\$id', id_) @@ -1737,7 +1898,7 @@ def publishStates(Map args) { // convert state to yaml blob def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename)) - [id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -1749,33 +1910,17 @@ process publishStatesProc { publishDir path: "${getPublishDir()}/", mode: "copy" tag "$id" input: - tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + tuple val(id), val(yamlBlob), val(yamlFile) output: - tuple val(id), path{[yamlFile] + outputFiles} + tuple val(id), path{[yamlFile]} script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } - } """ -mkdir -p "\$(dirname '${yamlFile}')" -echo "Storing state as yaml" -echo '${yamlBlob}' > '${yamlFile}' -echo "Copying output files to destination folder" -${copyCommands.join("\n ")} -""" + mkdir -p "\$(dirname '${yamlFile}')" + echo "Storing state as yaml" + cat > '${yamlFile}' << HERE +${yamlBlob} +HERE + """ } @@ -1806,13 +1951,10 @@ def publishStatesByConfig(Map args) { .replaceAll('\\$\\{key\\}', key_) def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent() - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // the processed state is a list of [key, value] tuples, where // - key is a String // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) - // - inputPath is a List[Path] - // - outputFilename is a List[String] // - (key, value) are the tuples that will be saved to the state.yaml file - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) def processedState = config.allArguments .findAll { it.direction == "output" } @@ -1829,7 +1971,7 @@ def publishStatesByConfig(Map args) { // in the state as-is, but is not something that needs // to be copied from the source path to the dest path if (par.type != "file") { - return [[key: plainName_, value: value, inputPath: [], outputFilename: []]] + return [[key: plainName_, value: value]] } // if the orig state does not contain this filename, // it's an optional argument for which the user specified @@ -1860,13 +2002,9 @@ def publishStatesByConfig(Map args) { if (yamlDir != null) { value_ = yamlDir.relativize(value_) } - def inputPath = val instanceof File ? val.toPath() : val - [value: value_, inputPath: inputPath, outputFilename: filename_ix] + return value_ } - def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] - } - return [[key: plainName_] + transposedOutputs] + return [["key": plainName_, "value": outputPerFile]] } else { def value_ = java.nio.file.Paths.get(filename) // if id contains a slash @@ -1874,18 +2012,17 @@ def publishStatesByConfig(Map args) { value_ = yamlDir.relativize(value_) } def inputPath = value instanceof File ? value.toPath() : value - return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]] + return [["key": plainName_, value: value_]] } } + def updatedState_ = processedState.collectEntries{[it.key, it.value]} - def inputPaths = processedState.collectMany{it.inputPath} - def outputFilenames = processedState.collectMany{it.outputFilename} // convert state to yaml blob def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_) - [id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -2559,7 +2696,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -2716,12 +2854,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } // TODO: move some of the _meta.join_id wrangling to the safeJoin() function. - def chInitialOutput = chArgsWithDefaults + def chInitialOutputMulti = chArgsWithDefaults | _debug(workflowArgs, "processed") // run workflow | innerWorkflowFactory(workflowArgs) - // check output tuple - | map { id_, output_ -> + def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti] + assert chInitialOutputList.size() > 0: "should have emitted at least one output channel" + // Add a channel ID to the events, which designates the channel the event was emitted from as a running number + // This number is used to sort the events later when the events are gathered from across the channels. + def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex -> + def newChannel = channel + | map {tuple -> + assert tuple instanceof List : + "Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" + + " Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" + + " Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}" + + def newEvent = [channelIndex] + tuple + return newEvent + } + return newChannel + } + // Put the events into 1 channel, cover case where there is only one channel is emitted + def chInitialOutput = chInitialOutputList.size() > 1 ? \ + chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \ + chInitialOutputListWithIndexedEvents[0] + def chInitialOutputProcessed = chInitialOutput + | map { tuple -> + def channelId = tuple[0] + def id_ = tuple[1] + def output_ = tuple[2] // see if output map contains metadata def meta_ = @@ -2734,19 +2896,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { output_ = output_.findAll{k, v -> k != "_meta"} // check value types - output_ = _processOutputValues(output_, meta.config, id_, key_) + output_ = _checkValidOutputArgument(output_, meta.config, id_, key_) - // simplify output if need be - if (workflowArgs.auto.simplifyOutput && output_.size() == 1) { - output_ = output_.values()[0] - } - - [join_id, id_, output_] + [join_id, channelId, id_, output_] } // | view{"chInitialOutput: ${it.take(3)}"} + // join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...] + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) + // input tuple format: [join_id, channel_id, id, output, prev_state, ...] + // output tuple format: [join_id, channel_id, id, new_state, ...] + | map{ tup -> + def new_state = workflowArgs.toState(tup.drop(2).take(3)) + tup.take(3) + [new_state] + tup.drop(5) + } + if (workflowArgs.auto.publish == "state") { + def chPublishFiles = chPublishWithPreviousState + // input tuple format: [join_id, channel_id, id, new_state, ...] + // output tuple format: [join_id, channel_id, id, new_state] + | map{ tup -> + tup.take(4) + } + + safeJoin(chPublishFiles, chArgsWithDefaults, key_) + // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] + // output tuple format: [id, new_state, orig_state] + | map { tup -> + tup.drop(2).take(3) + } + | publishFilesByConfig(key: key_, config: meta.config) + } + // Join the state from the events that were emitted from different channels + def chJoined = chInitialOutputProcessed + | map {tuple -> + def join_id = tuple[0] + def channel_id = tuple[1] + def id = tuple[2] + def other = tuple.drop(3) + // Below, groupTuple is used to join the events. To make sure resuming a workflow + // keeps working, the output state must be deterministic. This means the state needs to be + // sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash', + // but hashing the state when it is large can be problematic in terms of performance. + // Therefore, a custom comparator function is provided. We add the channel ID to the + // states so that we can use the channel ID to sort the items. + def stateWithChannelID = [[channel_id] * other.size(), other].transpose() + // A comparator that is provided to groupTuple's 'sort' argument is applied + // to all elements of the event tuple (that is not the 'id'). The comparator + // closure that is used below expects the input to be List. So the join_id and + // channel_id must also be wrapped in a list. + [[join_id], [channel_id], id] + stateWithChannelID + } + | groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true) + | map {join_ids, _, id, statesWithChannelID -> + // Remove the channel IDs from the states + def states = statesWithChannelID.collect{it[1]} + def newJoinId = join_ids.flatten().unique{a, b -> a <=> b} + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." + def newJoinIdUnique = newJoinId[0] + + // Merge the states from the different channels + def newState = states.inject([:]){ old_state, state_to_add -> + return old_state + state_to_add.collectEntries{k, v -> + if (!multipleArgs.contains(k)) { + // if the key is not a multiple argument, we expect only one value + if (old_state.containsKey(k)) { + assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted." + } + [k, v] + } else { + // if the key is a multiple argument, append the different values into one list + def prevValue = old_state.getOrDefault(k, []) + def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue] + [k, prevValueAsList + v] + } + } + } + + _checkAllRequiredOuputsPresent(newState, meta.config, id, key_) + + // simplify output if need be + if (workflowArgs.auto.simplifyOutput && newState.size() == 1) { + newState = newState.values()[0] + } + + return [newJoinIdUnique, id, newState] + } + // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_) + def chNewState = safeJoin(chJoined, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -2755,23 +2992,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } if (workflowArgs.auto.publish == "state") { - def chPublish = chNewState + def chPublishStates = chNewState // input tuple format: [join_id, id, new_state, ...] // output tuple format: [join_id, id, new_state] | map{ tup -> tup.take(3) } - safeJoin(chPublish, chArgsWithDefaults, key_) + safeJoin(chPublishStates, chArgsWithDefaults, key_) // input tuple format: [join_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(1).take(3) - } + } | publishStatesByConfig(key: key_, config: meta.config) } - - // remove join_id and meta chReturn = chNewState | map { tup -> // input tuple format: [join_id, id, new_state, ...] @@ -2806,6 +3041,56 @@ meta = [ "config": processConfig(readJsonBlob('''{ "name" : "sync_resources", "version" : "main", + "authors" : [ + { + "name" : "Robrecht Cannoodt", + "roles" : [ + "author", + "maintainer" + ], + "info" : { + "links" : { + "email" : "robrecht@data-intuitive.com", + "github" : "rcannood", + "orcid" : "0000-0003-3641-729X", + "linkedin" : "robrechtcannoodt" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Science Engineer" + }, + { + "name" : "Open Problems", + "href" : "https://openproblems.bio", + "role" : "Core Member" + } + ] + } + }, + { + "name" : "Dries Schaumont", + "roles" : [ + "reviewer" + ], + "info" : { + "links" : { + "email" : "dries@data-intuitive.com", + "github" : "DriesSchaumont", + "orcid" : "0000-0002-4389-0440", + "linkedin" : "dries-schaumont" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Scientist" + } + ] + } + } + ], "argument_groups" : [ { "name" : "Inputs", @@ -2879,7 +3164,8 @@ meta = [ "is_executable" : true } ], - "description" : "Sync a Viash package's test resources to the local filesystem", + "summary" : "Sync a Viash package's test resources to the local filesystem", + "description" : "Sync a Viash package's test resources to the local filesystem based on the\nthe `.info.test_resources` field in the `_viash.yaml` file. This is useful for\ntesting and debugging purposes.\n", "usage" : "sync_resources\nsync_resources --input _viash.yaml --output .\n", "test_resources" : [ { @@ -2889,6 +3175,10 @@ meta = [ } ], "status" : "enabled", + "scope" : { + "image" : "public", + "target" : "public" + }, "requirements" : { "commands" : [ "ps" @@ -3008,16 +3298,16 @@ meta = [ "runner" : "nextflow", "engine" : "docker|native", "output" : "target/nextflow/sync_resources", - "viash_version" : "0.9.0", - "git_commit" : "3c8413009764e3a6839e3e8b038857caf7047593", + "viash_version" : "0.9.3", + "git_commit" : "8f9353f15e4d6952eca57e896f962a60b42b0a3c", "git_remote" : "https://github.com/viash-hub/craftbox", - "git_tag" : "v0.1.0-3-g3c84130" + "git_tag" : "v0.1.0-4-g8f9353f" }, "package_config" : { "name" : "craftbox", "version" : "main", "description" : "A collection of custom-tailored scripts and applied tools.\n", - "viash_version" : "0.9.0", + "viash_version" : "0.9.3", "source" : "src", "target" : "target", "config_mods" : [ @@ -3438,7 +3728,7 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { // create process from temp file def binding = new nextflow.script.ScriptBinding([:]) def session = nextflow.Nextflow.getSession() - def parser = new nextflow.script.ScriptParser(session) + def parser = _getScriptLoader(session) .setModule(true) .setBinding(binding) def moduleScript = parser.runScript(tempFile) @@ -3452,6 +3742,27 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { return scriptMeta.getProcess(procKey) } +// use Reflection to get a ScriptParser / ScriptLoader +// <25.02.0-edge: new nextflow.script.ScriptParser(session) +// >=25.02.0-edge: nextflow.script.ScriptLoaderFactory.create(session) +def _getScriptLoader(nextflow.Session session) { + // try using the old method + try { + Class scriptParserClass = Class.forName('nextflow.script.ScriptParser') + return scriptParserClass.getDeclaredConstructor(nextflow.Session).newInstance(session) + } catch (ClassNotFoundException e) { + // else try with the new method + try { + Class scriptLoaderFactoryClass = Class.forName('nextflow.script.ScriptLoaderFactory') + def createMethod = scriptLoaderFactoryClass.getDeclaredMethod('create', nextflow.Session) + return createMethod.invoke(null, session) // null because create is static + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | java.lang.reflect.InvocationTargetException e2) { + // Handle the case where neither class is found + throw new Exception("Neither nextflow.script.ScriptParser nor nextflow.script.ScriptLoaderFactory could be found. Is this a compatible Nextflow version?", e2) + } + } +} + // defaults meta["defaults"] = [ // key to be used to trace the process and determine output names diff --git a/target/nextflow/sync_resources/nextflow.config b/target/nextflow/sync_resources/nextflow.config index 857c2a4..96d0f28 100644 --- a/target/nextflow/sync_resources/nextflow.config +++ b/target/nextflow/sync_resources/nextflow.config @@ -3,7 +3,8 @@ manifest { mainScript = 'main.nf' nextflowVersion = '!>=20.12.1-edge' version = 'main' - description = 'Sync a Viash package\'s test resources to the local filesystem' + description = 'Sync a Viash package\'s test resources to the local filesystem based on the\nthe `.info.test_resources` field in the `_viash.yaml` file. This is useful for\ntesting and debugging purposes.\n' + author = 'Robrecht Cannoodt, Dries Schaumont' } process.container = 'nextflow/bash:latest' diff --git a/target/nextflow/sync_resources/nextflow_schema.json b/target/nextflow/sync_resources/nextflow_schema.json index eb472c5..88cab49 100644 --- a/target/nextflow/sync_resources/nextflow_schema.json +++ b/target/nextflow/sync_resources/nextflow_schema.json @@ -1,7 +1,7 @@ { "$schema": "http://json-schema.org/draft-07/schema", "title": "sync_resources", -"description": "Sync a Viash package\u0027s test resources to the local filesystem", +"description": "Sync a Viash package\u0027s test resources to the local filesystem based on the\nthe `.info.test_resources` field in the `_viash.yaml` file. This is useful for\ntesting and debugging purposes.\n", "type": "object", "definitions": { diff --git a/target/nextflow/untar/.config.vsh.yaml b/target/nextflow/untar/.config.vsh.yaml index 23ac15f..51d078d 100644 --- a/target/nextflow/untar/.config.vsh.yaml +++ b/target/nextflow/untar/.config.vsh.yaml @@ -1,5 +1,36 @@ name: "untar" version: "main" +authors: +- name: "Dries Schaumont" + roles: + - "author" + - "maintainer" + info: + links: + email: "dries@data-intuitive.com" + github: "DriesSchaumont" + orcid: "0000-0002-4389-0440" + linkedin: "dries-schaumont" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Scientist" +- name: "Robrecht Cannoodt" + roles: + - "reviewer" + info: + links: + email: "robrecht@data-intuitive.com" + github: "rcannood" + orcid: "0000-0003-3641-729X" + linkedin: "robrechtcannoodt" + organizations: + - name: "Data Intuitive" + href: "https://www.data-intuitive.com" + role: "Data Science Engineer" + - name: "Open Problems" + href: "https://openproblems.bio" + role: "Core Member" argument_groups: - name: "Input arguments" arguments: @@ -44,6 +75,7 @@ resources: - type: "bash_script" path: "script.sh" is_executable: true +summary: "Unpack a .tar file" description: "Unpack a .tar file. When the contents of the .tar file is just a single\ \ directory,\nput the contents of the directory into the output folder instead of\ \ that directory.\n" @@ -53,6 +85,9 @@ test_resources: is_executable: true info: null status: "enabled" +scope: + image: "public" + target: "public" requirements: commands: - "ps" @@ -146,16 +181,16 @@ build_info: engine: "docker|native" output: "target/nextflow/untar" executable: "target/nextflow/untar/main.nf" - viash_version: "0.9.0" - git_commit: "3c8413009764e3a6839e3e8b038857caf7047593" + viash_version: "0.9.3" + git_commit: "8f9353f15e4d6952eca57e896f962a60b42b0a3c" git_remote: "https://github.com/viash-hub/craftbox" - git_tag: "v0.1.0-3-g3c84130" + git_tag: "v0.1.0-4-g8f9353f" package_config: name: "craftbox" version: "main" description: "A collection of custom-tailored scripts and applied tools.\n" info: null - viash_version: "0.9.0" + viash_version: "0.9.3" source: "src" target: "target" config_mods: diff --git a/target/nextflow/untar/main.nf b/target/nextflow/untar/main.nf index 415c3d4..b4b50d5 100644 --- a/target/nextflow/untar/main.nf +++ b/target/nextflow/untar/main.nf @@ -1,6 +1,6 @@ // untar main // -// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative +// This wrapper script is auto-generated by viash 0.9.3 and is thus a derivative // work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data // Intuitive. // @@ -8,6 +8,10 @@ // authors of this component should specify the license in the header of such // files, or include a separate license file detailing the licenses of all included // files. +// +// Component authors: +// * Dries Schaumont (author, maintainer) +// * Robrecht Cannoodt (reviewer) //////////////////////////// // VDSL3 helper functions // @@ -173,7 +177,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi Map _processInputValues(Map inputs, Map config, String id, String key) { if (!workflow.stubRun) { config.allArguments.each { arg -> - if (arg.required) { + if (arg.required && arg.direction == "input") { assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null : "Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing" } @@ -192,15 +196,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) { } // helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf' -Map _processOutputValues(Map outputs, Map config, String id, String key) { +Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) { if (!workflow.stubRun) { - config.allArguments.each { arg -> - if (arg.direction == "output" && arg.required) { - assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : - "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" - } - } - outputs = outputs.collectEntries { name, value -> def par = config.allArguments.find { it.plainName == name && it.direction == "output" } assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument" @@ -213,6 +210,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) { return outputs } +void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) { + if (!workflow.stubRun) { + config.allArguments.each { arg -> + if (arg.direction == "output" && arg.required) { + assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null : + "Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing" + } + } + } +} // helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf' class IDChecker { final def items = [] as Set @@ -1666,6 +1673,162 @@ def joinStates(Closure apply_) { } return joinStatesWf } +// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf' +def publishFiles(Map args) { + def key_ = args.get("key") + + assert key_ != null : "publishFiles: key must be specified" + + workflow publishFilesWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] + + // the input files and the target output filenames + def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() + def inputFiles_ = inputoutputFilenames_[0] + def outputFilenames_ = inputoutputFilenames_[1] + + [id_, inputFiles_, outputFilenames_] + } + | publishFilesProc + emit: input_ch + } + return publishFilesWf +} + +process publishFilesProc { + // todo: check publishpath? + publishDir path: "${getPublishDir()}/", mode: "copy" + tag "$id" + input: + tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + output: + tuple val(id), path{outputFiles} + script: + def copyCommands = [ + inputFiles instanceof List ? inputFiles : [inputFiles], + outputFiles instanceof List ? outputFiles : [outputFiles] + ] + .transpose() + .collectMany{infile, outfile -> + if (infile.toString() != outfile.toString()) { + [ + "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", + "cp -r '${infile.toString()}' '${outfile.toString()}'" + ] + } else { + // no need to copy if infile is the same as outfile + [] + } + } + """ + echo "Copying output files to destination folder" + ${copyCommands.join("\n ")} + """ +} + + +// this assumes that the state contains no other values other than those specified in the config +def publishFilesByConfig(Map args) { + def config = args.get("config") + assert config != null : "publishFilesByConfig: config must be specified" + + def key_ = args.get("key", config.name) + assert key_ != null : "publishFilesByConfig: key must be specified" + + workflow publishFilesSimpleWf { + take: input_ch + main: + input_ch + | map { tup -> + def id_ = tup[0] + def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10] + def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad'] + + + // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // - key is a String + // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) + // - inputPath is a List[Path] + // - outputFilename is a List[String] + // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) + def processedState = + config.allArguments + .findAll { it.direction == "output" } + .collectMany { par -> + def plainName_ = par.plainName + // if the state does not contain the key, it's an + // optional argument for which the component did + // not generate any output OR multiple channels were emitted + // and the output was just not added to using the channel + // that is now being parsed + if (!state_.containsKey(plainName_)) { + return [] + } + def value = state_[plainName_] + // if the parameter is not a file, it should be stored + // in the state as-is, but is not something that needs + // to be copied from the source path to the dest path + if (par.type != "file") { + return [[inputPath: [], outputFilename: []]] + } + // if the orig state does not contain this filename, + // it's an optional argument for which the user specified + // that it should not be returned as a state + if (!origState_.containsKey(plainName_)) { + return [] + } + def filenameTemplate = origState_[plainName_] + // if the pararameter is multiple: true, fetch the template + if (par.multiple && filenameTemplate instanceof List) { + filenameTemplate = filenameTemplate[0] + } + // instantiate the template + def filename = filenameTemplate + .replaceAll('\\$id', id_) + .replaceAll('\\$\\{id\\}', id_) + .replaceAll('\\$key', key_) + .replaceAll('\\$\\{key\\}', key_) + if (par.multiple) { + // if the parameter is multiple: true, the filename + // should contain a wildcard '*' that is replaced with + // the index of the file + assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}" + def outputPerFile = value.withIndex().collect{ val, ix -> + def filename_ix = filename.replace("*", ix.toString()) + def inputPath = val instanceof File ? val.toPath() : val + [inputPath: inputPath, outputFilename: filename_ix] + } + def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key -> + [key, outputPerFile.collect{dic -> dic[key]}] + } + return [[key: plainName_] + transposedOutputs] + } else { + def value_ = java.nio.file.Paths.get(filename) + def inputPath = value instanceof File ? value.toPath() : value + return [[inputPath: [inputPath], outputFilename: [filename]]] + } + } + + def inputPaths = processedState.collectMany{it.inputPath} + def outputFilenames = processedState.collectMany{it.outputFilename} + + + [id_, inputPaths, outputFilenames] + } + | publishFilesProc + emit: input_ch + } + return publishFilesSimpleWf +} + + + + // helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf' def collectFiles(obj) { if (obj instanceof java.io.File || obj instanceof Path) { @@ -1723,8 +1886,6 @@ def publishStates(Map args) { // the input files and the target output filenames def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose() - def inputFiles_ = inputoutputFilenames_[0] - def outputFilenames_ = inputoutputFilenames_[1] def yamlFilename = yamlTemplate_ .replaceAll('\\$id', id_) @@ -1737,7 +1898,7 @@ def publishStates(Map args) { // convert state to yaml blob def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename)) - [id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -1749,33 +1910,17 @@ process publishStatesProc { publishDir path: "${getPublishDir()}/", mode: "copy" tag "$id" input: - tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles) + tuple val(id), val(yamlBlob), val(yamlFile) output: - tuple val(id), path{[yamlFile] + outputFiles} + tuple val(id), path{[yamlFile]} script: - def copyCommands = [ - inputFiles instanceof List ? inputFiles : [inputFiles], - outputFiles instanceof List ? outputFiles : [outputFiles] - ] - .transpose() - .collectMany{infile, outfile -> - if (infile.toString() != outfile.toString()) { - [ - "[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"", - "cp -r '${infile.toString()}' '${outfile.toString()}'" - ] - } else { - // no need to copy if infile is the same as outfile - [] - } - } """ -mkdir -p "\$(dirname '${yamlFile}')" -echo "Storing state as yaml" -echo '${yamlBlob}' > '${yamlFile}' -echo "Copying output files to destination folder" -${copyCommands.join("\n ")} -""" + mkdir -p "\$(dirname '${yamlFile}')" + echo "Storing state as yaml" + cat > '${yamlFile}' << HERE +${yamlBlob} +HERE + """ } @@ -1806,13 +1951,10 @@ def publishStatesByConfig(Map args) { .replaceAll('\\$\\{key\\}', key_) def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent() - // the processed state is a list of [key, value, inputPath, outputFilename] tuples, where + // the processed state is a list of [key, value] tuples, where // - key is a String // - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path) - // - inputPath is a List[Path] - // - outputFilename is a List[String] // - (key, value) are the tuples that will be saved to the state.yaml file - // - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml) def processedState = config.allArguments .findAll { it.direction == "output" } @@ -1829,7 +1971,7 @@ def publishStatesByConfig(Map args) { // in the state as-is, but is not something that needs // to be copied from the source path to the dest path if (par.type != "file") { - return [[key: plainName_, value: value, inputPath: [], outputFilename: []]] + return [[key: plainName_, value: value]] } // if the orig state does not contain this filename, // it's an optional argument for which the user specified @@ -1860,13 +2002,9 @@ def publishStatesByConfig(Map args) { if (yamlDir != null) { value_ = yamlDir.relativize(value_) } - def inputPath = val instanceof File ? val.toPath() : val - [value: value_, inputPath: inputPath, outputFilename: filename_ix] + return value_ } - def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key -> - [key, outputPerFile.collect{dic -> dic[key]}] - } - return [[key: plainName_] + transposedOutputs] + return [["key": plainName_, "value": outputPerFile]] } else { def value_ = java.nio.file.Paths.get(filename) // if id contains a slash @@ -1874,18 +2012,17 @@ def publishStatesByConfig(Map args) { value_ = yamlDir.relativize(value_) } def inputPath = value instanceof File ? value.toPath() : value - return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]] + return [["key": plainName_, value: value_]] } } + def updatedState_ = processedState.collectEntries{[it.key, it.value]} - def inputPaths = processedState.collectMany{it.inputPath} - def outputFilenames = processedState.collectMany{it.outputFilename} // convert state to yaml blob def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_) - [id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames] + [id_, yamlBlob_, yamlFilename] } | publishStatesProc emit: input_ch @@ -2559,7 +2696,8 @@ def _debug(workflowArgs, debugKey) { def workflowFactory(Map args, Map defaultWfArgs, Map meta) { def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta) def key_ = workflowArgs["key"] - + def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName} + workflow workflowInstance { take: input_ @@ -2716,12 +2854,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } // TODO: move some of the _meta.join_id wrangling to the safeJoin() function. - def chInitialOutput = chArgsWithDefaults + def chInitialOutputMulti = chArgsWithDefaults | _debug(workflowArgs, "processed") // run workflow | innerWorkflowFactory(workflowArgs) - // check output tuple - | map { id_, output_ -> + def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti] + assert chInitialOutputList.size() > 0: "should have emitted at least one output channel" + // Add a channel ID to the events, which designates the channel the event was emitted from as a running number + // This number is used to sort the events later when the events are gathered from across the channels. + def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex -> + def newChannel = channel + | map {tuple -> + assert tuple instanceof List : + "Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" + + " Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" + + " Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}" + + def newEvent = [channelIndex] + tuple + return newEvent + } + return newChannel + } + // Put the events into 1 channel, cover case where there is only one channel is emitted + def chInitialOutput = chInitialOutputList.size() > 1 ? \ + chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \ + chInitialOutputListWithIndexedEvents[0] + def chInitialOutputProcessed = chInitialOutput + | map { tuple -> + def channelId = tuple[0] + def id_ = tuple[1] + def output_ = tuple[2] // see if output map contains metadata def meta_ = @@ -2734,19 +2896,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { output_ = output_.findAll{k, v -> k != "_meta"} // check value types - output_ = _processOutputValues(output_, meta.config, id_, key_) + output_ = _checkValidOutputArgument(output_, meta.config, id_, key_) - // simplify output if need be - if (workflowArgs.auto.simplifyOutput && output_.size() == 1) { - output_ = output_.values()[0] - } - - [join_id, id_, output_] + [join_id, channelId, id_, output_] } // | view{"chInitialOutput: ${it.take(3)}"} + // join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...] + def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_) + // input tuple format: [join_id, channel_id, id, output, prev_state, ...] + // output tuple format: [join_id, channel_id, id, new_state, ...] + | map{ tup -> + def new_state = workflowArgs.toState(tup.drop(2).take(3)) + tup.take(3) + [new_state] + tup.drop(5) + } + if (workflowArgs.auto.publish == "state") { + def chPublishFiles = chPublishWithPreviousState + // input tuple format: [join_id, channel_id, id, new_state, ...] + // output tuple format: [join_id, channel_id, id, new_state] + | map{ tup -> + tup.take(4) + } + + safeJoin(chPublishFiles, chArgsWithDefaults, key_) + // input tuple format: [join_id, channel_id, id, new_state, orig_state, ...] + // output tuple format: [id, new_state, orig_state] + | map { tup -> + tup.drop(2).take(3) + } + | publishFilesByConfig(key: key_, config: meta.config) + } + // Join the state from the events that were emitted from different channels + def chJoined = chInitialOutputProcessed + | map {tuple -> + def join_id = tuple[0] + def channel_id = tuple[1] + def id = tuple[2] + def other = tuple.drop(3) + // Below, groupTuple is used to join the events. To make sure resuming a workflow + // keeps working, the output state must be deterministic. This means the state needs to be + // sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash', + // but hashing the state when it is large can be problematic in terms of performance. + // Therefore, a custom comparator function is provided. We add the channel ID to the + // states so that we can use the channel ID to sort the items. + def stateWithChannelID = [[channel_id] * other.size(), other].transpose() + // A comparator that is provided to groupTuple's 'sort' argument is applied + // to all elements of the event tuple (that is not the 'id'). The comparator + // closure that is used below expects the input to be List. So the join_id and + // channel_id must also be wrapped in a list. + [[join_id], [channel_id], id] + stateWithChannelID + } + | groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true) + | map {join_ids, _, id, statesWithChannelID -> + // Remove the channel IDs from the states + def states = statesWithChannelID.collect{it[1]} + def newJoinId = join_ids.flatten().unique{a, b -> a <=> b} + assert newJoinId.size() == 1: "Multiple events were emitted for '$id'." + def newJoinIdUnique = newJoinId[0] + + // Merge the states from the different channels + def newState = states.inject([:]){ old_state, state_to_add -> + return old_state + state_to_add.collectEntries{k, v -> + if (!multipleArgs.contains(k)) { + // if the key is not a multiple argument, we expect only one value + if (old_state.containsKey(k)) { + assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted." + } + [k, v] + } else { + // if the key is a multiple argument, append the different values into one list + def prevValue = old_state.getOrDefault(k, []) + def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue] + [k, prevValueAsList + v] + } + } + } + + _checkAllRequiredOuputsPresent(newState, meta.config, id, key_) + + // simplify output if need be + if (workflowArgs.auto.simplifyOutput && newState.size() == 1) { + newState = newState.values()[0] + } + + return [newJoinIdUnique, id, newState] + } + // join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...] - def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_) + def chNewState = safeJoin(chJoined, chRunFiltered, key_) // input tuple format: [join_id, id, output, prev_state, ...] // output tuple format: [join_id, id, new_state, ...] | map{ tup -> @@ -2755,23 +2992,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) { } if (workflowArgs.auto.publish == "state") { - def chPublish = chNewState + def chPublishStates = chNewState // input tuple format: [join_id, id, new_state, ...] // output tuple format: [join_id, id, new_state] | map{ tup -> tup.take(3) } - safeJoin(chPublish, chArgsWithDefaults, key_) + safeJoin(chPublishStates, chArgsWithDefaults, key_) // input tuple format: [join_id, id, new_state, orig_state, ...] // output tuple format: [id, new_state, orig_state] | map { tup -> tup.drop(1).take(3) - } + } | publishStatesByConfig(key: key_, config: meta.config) } - - // remove join_id and meta chReturn = chNewState | map { tup -> // input tuple format: [join_id, id, new_state, ...] @@ -2806,6 +3041,56 @@ meta = [ "config": processConfig(readJsonBlob('''{ "name" : "untar", "version" : "main", + "authors" : [ + { + "name" : "Dries Schaumont", + "roles" : [ + "author", + "maintainer" + ], + "info" : { + "links" : { + "email" : "dries@data-intuitive.com", + "github" : "DriesSchaumont", + "orcid" : "0000-0002-4389-0440", + "linkedin" : "dries-schaumont" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Scientist" + } + ] + } + }, + { + "name" : "Robrecht Cannoodt", + "roles" : [ + "reviewer" + ], + "info" : { + "links" : { + "email" : "robrecht@data-intuitive.com", + "github" : "rcannood", + "orcid" : "0000-0003-3641-729X", + "linkedin" : "robrechtcannoodt" + }, + "organizations" : [ + { + "name" : "Data Intuitive", + "href" : "https://www.data-intuitive.com", + "role" : "Data Science Engineer" + }, + { + "name" : "Open Problems", + "href" : "https://openproblems.bio", + "role" : "Core Member" + } + ] + } + } + ], "argument_groups" : [ { "name" : "Input arguments", @@ -2867,6 +3152,7 @@ meta = [ "is_executable" : true } ], + "summary" : "Unpack a .tar file", "description" : "Unpack a .tar file. When the contents of the .tar file is just a single directory,\nput the contents of the directory into the output folder instead of that directory.\n", "test_resources" : [ { @@ -2876,6 +3162,10 @@ meta = [ } ], "status" : "enabled", + "scope" : { + "image" : "public", + "target" : "public" + }, "requirements" : { "commands" : [ "ps" @@ -2987,16 +3277,16 @@ meta = [ "runner" : "nextflow", "engine" : "docker|native", "output" : "target/nextflow/untar", - "viash_version" : "0.9.0", - "git_commit" : "3c8413009764e3a6839e3e8b038857caf7047593", + "viash_version" : "0.9.3", + "git_commit" : "8f9353f15e4d6952eca57e896f962a60b42b0a3c", "git_remote" : "https://github.com/viash-hub/craftbox", - "git_tag" : "v0.1.0-3-g3c84130" + "git_tag" : "v0.1.0-4-g8f9353f" }, "package_config" : { "name" : "craftbox", "version" : "main", "description" : "A collection of custom-tailored scripts and applied tools.\n", - "viash_version" : "0.9.0", + "viash_version" : "0.9.3", "source" : "src", "target" : "target", "config_mods" : [ @@ -3428,7 +3718,7 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { // create process from temp file def binding = new nextflow.script.ScriptBinding([:]) def session = nextflow.Nextflow.getSession() - def parser = new nextflow.script.ScriptParser(session) + def parser = _getScriptLoader(session) .setModule(true) .setBinding(binding) def moduleScript = parser.runScript(tempFile) @@ -3442,6 +3732,27 @@ def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) { return scriptMeta.getProcess(procKey) } +// use Reflection to get a ScriptParser / ScriptLoader +// <25.02.0-edge: new nextflow.script.ScriptParser(session) +// >=25.02.0-edge: nextflow.script.ScriptLoaderFactory.create(session) +def _getScriptLoader(nextflow.Session session) { + // try using the old method + try { + Class scriptParserClass = Class.forName('nextflow.script.ScriptParser') + return scriptParserClass.getDeclaredConstructor(nextflow.Session).newInstance(session) + } catch (ClassNotFoundException e) { + // else try with the new method + try { + Class scriptLoaderFactoryClass = Class.forName('nextflow.script.ScriptLoaderFactory') + def createMethod = scriptLoaderFactoryClass.getDeclaredMethod('create', nextflow.Session) + return createMethod.invoke(null, session) // null because create is static + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | java.lang.reflect.InvocationTargetException e2) { + // Handle the case where neither class is found + throw new Exception("Neither nextflow.script.ScriptParser nor nextflow.script.ScriptLoaderFactory could be found. Is this a compatible Nextflow version?", e2) + } + } +} + // defaults meta["defaults"] = [ // key to be used to trace the process and determine output names diff --git a/target/nextflow/untar/nextflow.config b/target/nextflow/untar/nextflow.config index 51d7ab4..5b40c40 100644 --- a/target/nextflow/untar/nextflow.config +++ b/target/nextflow/untar/nextflow.config @@ -4,6 +4,7 @@ manifest { nextflowVersion = '!>=20.12.1-edge' version = 'main' description = 'Unpack a .tar file. When the contents of the .tar file is just a single directory,\nput the contents of the directory into the output folder instead of that directory.\n' + author = 'Dries Schaumont, Robrecht Cannoodt' } process.container = 'nextflow/bash:latest'