Build branch demultiplex_htrnaseq_meta with version demultiplex_htrnaseq_meta (4dc3a87)
Build pipeline: viash-hub.playground.demultiplex-htrnaseq-meta-9b99r
Source commit: 4dc3a874d2
Source message: updated test resources
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -3,4 +3,4 @@
|
||||
work
|
||||
target/
|
||||
scm.config
|
||||
test_data
|
||||
test_data/
|
||||
@@ -1,6 +1,6 @@
|
||||
input: gs://viash-hub-test-data/demultiplex/v3/demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2
|
||||
run_information: gs://viash-hub-test-data/demultiplex/v3/demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2/SampleSheet.csv
|
||||
input: gs://viash-hub-resources/demultiplex/v3/demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2
|
||||
run_information: gs://viash-hub-resources/demultiplex/v3/demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2/SampleSheet.csv
|
||||
demultiplexer: bclconvert
|
||||
barcodesFasta: gs://viash-hub-test-data/demultiplex/v3/demultiplex_htrnaseq_meta/barcodes.fasta
|
||||
genomeDir: gs://viash-hub-test-data/demultiplex/v3/demultiplex_htrnaseq_meta/gencode.v41.star.sparse
|
||||
annotation: gs://viash-hub-test-data/demultiplex/v3/demultiplex_htrnaseq_meta/gencode.v41.annotation.gtf.gz
|
||||
barcodesFasta: gs://viash-hub-resources/demultiplex/v3/demultiplex_htrnaseq_meta/barcodes.fasta
|
||||
genomeDir: gs://viash-hub-resources/demultiplex/v3/demultiplex_htrnaseq_meta/gencode.v41.star.sparse
|
||||
annotation: gs://viash-hub-resources/demultiplex/v3/demultiplex_htrnaseq_meta/gencode.v41.annotation.gtf.gz
|
||||
@@ -633,6 +633,9 @@ test_resources:
|
||||
is_executable: true
|
||||
info: null
|
||||
status: "enabled"
|
||||
scope:
|
||||
image: "public"
|
||||
target: "public"
|
||||
requirements:
|
||||
commands:
|
||||
- "ps"
|
||||
@@ -739,16 +742,16 @@ build_info:
|
||||
engine: "docker|native"
|
||||
output: "target/nextflow/cutadapt"
|
||||
executable: "target/nextflow/cutadapt/main.nf"
|
||||
viash_version: "0.9.0"
|
||||
git_commit: "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b"
|
||||
git_remote: "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-27-g952ff08"
|
||||
viash_version: "0.9.2"
|
||||
git_commit: "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde"
|
||||
git_remote: "https://github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-30-g5f6516e"
|
||||
package_config:
|
||||
name: "biobox"
|
||||
version: "main"
|
||||
description: "A collection of bioinformatics tools for working with sequence data.\n"
|
||||
info: null
|
||||
viash_version: "0.9.0"
|
||||
viash_version: "0.9.2"
|
||||
source: "src"
|
||||
target: "target"
|
||||
config_mods:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// cutadapt main
|
||||
//
|
||||
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
|
||||
// This wrapper script is auto-generated by viash 0.9.2 and is thus a derivative
|
||||
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
|
||||
// Intuitive.
|
||||
//
|
||||
@@ -176,7 +176,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi
|
||||
Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.required) {
|
||||
if (arg.required && arg.direction == "input") {
|
||||
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
|
||||
}
|
||||
@@ -195,15 +195,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
}
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
|
||||
Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
|
||||
outputs = outputs.collectEntries { name, value ->
|
||||
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
|
||||
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
|
||||
@@ -216,6 +209,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
return outputs
|
||||
}
|
||||
|
||||
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
|
||||
class IDChecker {
|
||||
final def items = [] as Set
|
||||
@@ -1669,6 +1672,162 @@ def joinStates(Closure apply_) {
|
||||
}
|
||||
return joinStatesWf
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
|
||||
def publishFiles(Map args) {
|
||||
def key_ = args.get("key")
|
||||
|
||||
assert key_ != null : "publishFiles: key must be specified"
|
||||
|
||||
workflow publishFilesWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1]
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
[id_, inputFiles_, outputFilenames_]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesWf
|
||||
}
|
||||
|
||||
process publishFilesProc {
|
||||
// todo: check publishpath?
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
output:
|
||||
tuple val(id), path{outputFiles}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
// this assumes that the state contains no other values other than those specified in the config
|
||||
def publishFilesByConfig(Map args) {
|
||||
def config = args.get("config")
|
||||
assert config != null : "publishFilesByConfig: config must be specified"
|
||||
|
||||
def key_ = args.get("key", config.name)
|
||||
assert key_ != null : "publishFilesByConfig: key must be specified"
|
||||
|
||||
workflow publishFilesSimpleWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
|
||||
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
|
||||
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
.collectMany { par ->
|
||||
def plainName_ = par.plainName
|
||||
// if the state does not contain the key, it's an
|
||||
// optional argument for which the component did
|
||||
// not generate any output OR multiple channels were emitted
|
||||
// and the output was just not added to using the channel
|
||||
// that is now being parsed
|
||||
if (!state_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def value = state_[plainName_]
|
||||
// if the parameter is not a file, it should be stored
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[inputPath: [], outputFilename: []]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
// that it should not be returned as a state
|
||||
if (!origState_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def filenameTemplate = origState_[plainName_]
|
||||
// if the pararameter is multiple: true, fetch the template
|
||||
if (par.multiple && filenameTemplate instanceof List) {
|
||||
filenameTemplate = filenameTemplate[0]
|
||||
}
|
||||
// instantiate the template
|
||||
def filename = filenameTemplate
|
||||
.replaceAll('\\$id', id_)
|
||||
.replaceAll('\\$\\{id\\}', id_)
|
||||
.replaceAll('\\$key', key_)
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
if (par.multiple) {
|
||||
// if the parameter is multiple: true, the filename
|
||||
// should contain a wildcard '*' that is replaced with
|
||||
// the index of the file
|
||||
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
|
||||
def outputPerFile = value.withIndex().collect{ val, ix ->
|
||||
def filename_ix = filename.replace("*", ix.toString())
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[inputPath: inputPath, outputFilename: filename_ix]
|
||||
}
|
||||
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[inputPath: [inputPath], outputFilename: [filename]]]
|
||||
}
|
||||
}
|
||||
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
|
||||
[id_, inputPaths, outputFilenames]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesSimpleWf
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
|
||||
def collectFiles(obj) {
|
||||
if (obj instanceof java.io.File || obj instanceof Path) {
|
||||
@@ -1726,8 +1885,6 @@ def publishStates(Map args) {
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
def yamlFilename = yamlTemplate_
|
||||
.replaceAll('\\$id', id_)
|
||||
@@ -1740,7 +1897,7 @@ def publishStates(Map args) {
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -1752,33 +1909,17 @@ process publishStatesProc {
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
tuple val(id), val(yamlBlob), val(yamlFile)
|
||||
output:
|
||||
tuple val(id), path{[yamlFile] + outputFiles}
|
||||
tuple val(id), path{[yamlFile]}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
echo '${yamlBlob}' > '${yamlFile}'
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
cat > '${yamlFile}' << HERE
|
||||
${yamlBlob}
|
||||
HERE
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
@@ -1809,13 +1950,10 @@ def publishStatesByConfig(Map args) {
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// the processed state is a list of [key, value] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (key, value) are the tuples that will be saved to the state.yaml file
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
@@ -1832,7 +1970,7 @@ def publishStatesByConfig(Map args) {
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[key: plainName_, value: value, inputPath: [], outputFilename: []]]
|
||||
return [[key: plainName_, value: value]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
@@ -1863,13 +2001,9 @@ def publishStatesByConfig(Map args) {
|
||||
if (yamlDir != null) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[value: value_, inputPath: inputPath, outputFilename: filename_ix]
|
||||
return value_
|
||||
}
|
||||
def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
return [["key": plainName_, "value": outputPerFile]]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
// if id contains a slash
|
||||
@@ -1877,18 +2011,17 @@ def publishStatesByConfig(Map args) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]]
|
||||
return [["key": plainName_, value: value_]]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -2562,7 +2695,8 @@ def _debug(workflowArgs, debugKey) {
|
||||
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
|
||||
def key_ = workflowArgs["key"]
|
||||
|
||||
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
|
||||
|
||||
workflow workflowInstance {
|
||||
take: input_
|
||||
|
||||
@@ -2719,12 +2853,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
|
||||
def chInitialOutput = chArgsWithDefaults
|
||||
def chInitialOutputMulti = chArgsWithDefaults
|
||||
| _debug(workflowArgs, "processed")
|
||||
// run workflow
|
||||
| innerWorkflowFactory(workflowArgs)
|
||||
// check output tuple
|
||||
| map { id_, output_ ->
|
||||
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
|
||||
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
|
||||
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
|
||||
// This number is used to sort the events later when the events are gathered from across the channels.
|
||||
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
|
||||
def newChannel = channel
|
||||
| map {tuple ->
|
||||
assert tuple instanceof List :
|
||||
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
|
||||
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
|
||||
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
|
||||
|
||||
def newEvent = [channelIndex] + tuple
|
||||
return newEvent
|
||||
}
|
||||
return newChannel
|
||||
}
|
||||
// Put the events into 1 channel, cover case where there is only one channel is emitted
|
||||
def chInitialOutput = chInitialOutputList.size() > 1 ? \
|
||||
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
|
||||
chInitialOutputListWithIndexedEvents[0]
|
||||
def chInitialOutputProcessed = chInitialOutput
|
||||
| map { tuple ->
|
||||
def channelId = tuple[0]
|
||||
def id_ = tuple[1]
|
||||
def output_ = tuple[2]
|
||||
|
||||
// see if output map contains metadata
|
||||
def meta_ =
|
||||
@@ -2737,19 +2895,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
output_ = output_.findAll{k, v -> k != "_meta"}
|
||||
|
||||
// check value types
|
||||
output_ = _processOutputValues(output_, meta.config, id_, key_)
|
||||
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && output_.size() == 1) {
|
||||
output_ = output_.values()[0]
|
||||
}
|
||||
|
||||
[join_id, id_, output_]
|
||||
[join_id, channelId, id_, output_]
|
||||
}
|
||||
// | view{"chInitialOutput: ${it.take(3)}"}
|
||||
|
||||
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
def new_state = workflowArgs.toState(tup.drop(2).take(3))
|
||||
tup.take(3) + [new_state] + tup.drop(5)
|
||||
}
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublishFiles = chPublishWithPreviousState
|
||||
// input tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(4)
|
||||
}
|
||||
|
||||
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(2).take(3)
|
||||
}
|
||||
| publishFilesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
// Join the state from the events that were emitted from different channels
|
||||
def chJoined = chInitialOutputProcessed
|
||||
| map {tuple ->
|
||||
def join_id = tuple[0]
|
||||
def channel_id = tuple[1]
|
||||
def id = tuple[2]
|
||||
def other = tuple.drop(3)
|
||||
// Below, groupTuple is used to join the events. To make sure resuming a workflow
|
||||
// keeps working, the output state must be deterministic. This means the state needs to be
|
||||
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
|
||||
// but hashing the state when it is large can be problematic in terms of performance.
|
||||
// Therefore, a custom comparator function is provided. We add the channel ID to the
|
||||
// states so that we can use the channel ID to sort the items.
|
||||
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
|
||||
// A comparator that is provided to groupTuple's 'sort' argument is applied
|
||||
// to all elements of the event tuple (that is not the 'id'). The comparator
|
||||
// closure that is used below expects the input to be List. So the join_id and
|
||||
// channel_id must also be wrapped in a list.
|
||||
[[join_id], [channel_id], id] + stateWithChannelID
|
||||
}
|
||||
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
|
||||
| map {join_ids, _, id, statesWithChannelID ->
|
||||
// Remove the channel IDs from the states
|
||||
def states = statesWithChannelID.collect{it[1]}
|
||||
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
|
||||
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
|
||||
def newJoinIdUnique = newJoinId[0]
|
||||
|
||||
// Merge the states from the different channels
|
||||
def newState = states.inject([:]){ old_state, state_to_add ->
|
||||
return old_state + state_to_add.collectEntries{k, v ->
|
||||
if (!multipleArgs.contains(k)) {
|
||||
// if the key is not a multiple argument, we expect only one value
|
||||
if (old_state.containsKey(k)) {
|
||||
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
|
||||
}
|
||||
[k, v]
|
||||
} else {
|
||||
// if the key is a multiple argument, append the different values into one list
|
||||
def prevValue = old_state.getOrDefault(k, [])
|
||||
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
|
||||
[k, prevValueAsList + v]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
|
||||
newState = newState.values()[0]
|
||||
}
|
||||
|
||||
return [newJoinIdUnique, id, newState]
|
||||
}
|
||||
|
||||
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
|
||||
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
@@ -2758,23 +2991,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublish = chNewState
|
||||
def chPublishStates = chNewState
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(3)
|
||||
}
|
||||
|
||||
safeJoin(chPublish, chArgsWithDefaults, key_)
|
||||
safeJoin(chPublishStates, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(1).take(3)
|
||||
}
|
||||
}
|
||||
| publishStatesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
|
||||
// remove join_id and meta
|
||||
chReturn = chNewState
|
||||
| map { tup ->
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
@@ -3488,6 +3719,10 @@ meta = [
|
||||
}
|
||||
],
|
||||
"status" : "enabled",
|
||||
"scope" : {
|
||||
"image" : "public",
|
||||
"target" : "public"
|
||||
},
|
||||
"requirements" : {
|
||||
"commands" : [
|
||||
"ps"
|
||||
@@ -3618,16 +3853,16 @@ meta = [
|
||||
"runner" : "nextflow",
|
||||
"engine" : "docker|native",
|
||||
"output" : "target/nextflow/cutadapt",
|
||||
"viash_version" : "0.9.0",
|
||||
"git_commit" : "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b",
|
||||
"git_remote" : "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-27-g952ff08"
|
||||
"viash_version" : "0.9.2",
|
||||
"git_commit" : "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde",
|
||||
"git_remote" : "https://github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-30-g5f6516e"
|
||||
},
|
||||
"package_config" : {
|
||||
"name" : "biobox",
|
||||
"version" : "main",
|
||||
"description" : "A collection of bioinformatics tools for working with sequence data.\n",
|
||||
"viash_version" : "0.9.0",
|
||||
"viash_version" : "0.9.2",
|
||||
"source" : "src",
|
||||
"target" : "target",
|
||||
"config_mods" : [
|
||||
|
||||
@@ -203,6 +203,9 @@ test_resources:
|
||||
is_executable: true
|
||||
info: null
|
||||
status: "enabled"
|
||||
scope:
|
||||
image: "public"
|
||||
target: "public"
|
||||
requirements:
|
||||
commands:
|
||||
- "ps"
|
||||
@@ -316,16 +319,16 @@ build_info:
|
||||
engine: "docker|native"
|
||||
output: "target/nextflow/falco"
|
||||
executable: "target/nextflow/falco/main.nf"
|
||||
viash_version: "0.9.0"
|
||||
git_commit: "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b"
|
||||
git_remote: "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-27-g952ff08"
|
||||
viash_version: "0.9.2"
|
||||
git_commit: "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde"
|
||||
git_remote: "https://github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-30-g5f6516e"
|
||||
package_config:
|
||||
name: "biobox"
|
||||
version: "main"
|
||||
description: "A collection of bioinformatics tools for working with sequence data.\n"
|
||||
info: null
|
||||
viash_version: "0.9.0"
|
||||
viash_version: "0.9.2"
|
||||
source: "src"
|
||||
target: "target"
|
||||
config_mods:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// falco main
|
||||
//
|
||||
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
|
||||
// This wrapper script is auto-generated by viash 0.9.2 and is thus a derivative
|
||||
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
|
||||
// Intuitive.
|
||||
//
|
||||
@@ -176,7 +176,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi
|
||||
Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.required) {
|
||||
if (arg.required && arg.direction == "input") {
|
||||
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
|
||||
}
|
||||
@@ -195,15 +195,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
}
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
|
||||
Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
|
||||
outputs = outputs.collectEntries { name, value ->
|
||||
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
|
||||
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
|
||||
@@ -216,6 +209,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
return outputs
|
||||
}
|
||||
|
||||
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
|
||||
class IDChecker {
|
||||
final def items = [] as Set
|
||||
@@ -1669,6 +1672,162 @@ def joinStates(Closure apply_) {
|
||||
}
|
||||
return joinStatesWf
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
|
||||
def publishFiles(Map args) {
|
||||
def key_ = args.get("key")
|
||||
|
||||
assert key_ != null : "publishFiles: key must be specified"
|
||||
|
||||
workflow publishFilesWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1]
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
[id_, inputFiles_, outputFilenames_]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesWf
|
||||
}
|
||||
|
||||
process publishFilesProc {
|
||||
// todo: check publishpath?
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
output:
|
||||
tuple val(id), path{outputFiles}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
// this assumes that the state contains no other values other than those specified in the config
|
||||
def publishFilesByConfig(Map args) {
|
||||
def config = args.get("config")
|
||||
assert config != null : "publishFilesByConfig: config must be specified"
|
||||
|
||||
def key_ = args.get("key", config.name)
|
||||
assert key_ != null : "publishFilesByConfig: key must be specified"
|
||||
|
||||
workflow publishFilesSimpleWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
|
||||
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
|
||||
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
.collectMany { par ->
|
||||
def plainName_ = par.plainName
|
||||
// if the state does not contain the key, it's an
|
||||
// optional argument for which the component did
|
||||
// not generate any output OR multiple channels were emitted
|
||||
// and the output was just not added to using the channel
|
||||
// that is now being parsed
|
||||
if (!state_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def value = state_[plainName_]
|
||||
// if the parameter is not a file, it should be stored
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[inputPath: [], outputFilename: []]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
// that it should not be returned as a state
|
||||
if (!origState_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def filenameTemplate = origState_[plainName_]
|
||||
// if the pararameter is multiple: true, fetch the template
|
||||
if (par.multiple && filenameTemplate instanceof List) {
|
||||
filenameTemplate = filenameTemplate[0]
|
||||
}
|
||||
// instantiate the template
|
||||
def filename = filenameTemplate
|
||||
.replaceAll('\\$id', id_)
|
||||
.replaceAll('\\$\\{id\\}', id_)
|
||||
.replaceAll('\\$key', key_)
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
if (par.multiple) {
|
||||
// if the parameter is multiple: true, the filename
|
||||
// should contain a wildcard '*' that is replaced with
|
||||
// the index of the file
|
||||
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
|
||||
def outputPerFile = value.withIndex().collect{ val, ix ->
|
||||
def filename_ix = filename.replace("*", ix.toString())
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[inputPath: inputPath, outputFilename: filename_ix]
|
||||
}
|
||||
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[inputPath: [inputPath], outputFilename: [filename]]]
|
||||
}
|
||||
}
|
||||
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
|
||||
[id_, inputPaths, outputFilenames]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesSimpleWf
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
|
||||
def collectFiles(obj) {
|
||||
if (obj instanceof java.io.File || obj instanceof Path) {
|
||||
@@ -1726,8 +1885,6 @@ def publishStates(Map args) {
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
def yamlFilename = yamlTemplate_
|
||||
.replaceAll('\\$id', id_)
|
||||
@@ -1740,7 +1897,7 @@ def publishStates(Map args) {
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -1752,33 +1909,17 @@ process publishStatesProc {
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
tuple val(id), val(yamlBlob), val(yamlFile)
|
||||
output:
|
||||
tuple val(id), path{[yamlFile] + outputFiles}
|
||||
tuple val(id), path{[yamlFile]}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
echo '${yamlBlob}' > '${yamlFile}'
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
cat > '${yamlFile}' << HERE
|
||||
${yamlBlob}
|
||||
HERE
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
@@ -1809,13 +1950,10 @@ def publishStatesByConfig(Map args) {
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// the processed state is a list of [key, value] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (key, value) are the tuples that will be saved to the state.yaml file
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
@@ -1832,7 +1970,7 @@ def publishStatesByConfig(Map args) {
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[key: plainName_, value: value, inputPath: [], outputFilename: []]]
|
||||
return [[key: plainName_, value: value]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
@@ -1863,13 +2001,9 @@ def publishStatesByConfig(Map args) {
|
||||
if (yamlDir != null) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[value: value_, inputPath: inputPath, outputFilename: filename_ix]
|
||||
return value_
|
||||
}
|
||||
def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
return [["key": plainName_, "value": outputPerFile]]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
// if id contains a slash
|
||||
@@ -1877,18 +2011,17 @@ def publishStatesByConfig(Map args) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]]
|
||||
return [["key": plainName_, value: value_]]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -2562,7 +2695,8 @@ def _debug(workflowArgs, debugKey) {
|
||||
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
|
||||
def key_ = workflowArgs["key"]
|
||||
|
||||
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
|
||||
|
||||
workflow workflowInstance {
|
||||
take: input_
|
||||
|
||||
@@ -2719,12 +2853,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
|
||||
def chInitialOutput = chArgsWithDefaults
|
||||
def chInitialOutputMulti = chArgsWithDefaults
|
||||
| _debug(workflowArgs, "processed")
|
||||
// run workflow
|
||||
| innerWorkflowFactory(workflowArgs)
|
||||
// check output tuple
|
||||
| map { id_, output_ ->
|
||||
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
|
||||
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
|
||||
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
|
||||
// This number is used to sort the events later when the events are gathered from across the channels.
|
||||
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
|
||||
def newChannel = channel
|
||||
| map {tuple ->
|
||||
assert tuple instanceof List :
|
||||
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
|
||||
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
|
||||
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
|
||||
|
||||
def newEvent = [channelIndex] + tuple
|
||||
return newEvent
|
||||
}
|
||||
return newChannel
|
||||
}
|
||||
// Put the events into 1 channel, cover case where there is only one channel is emitted
|
||||
def chInitialOutput = chInitialOutputList.size() > 1 ? \
|
||||
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
|
||||
chInitialOutputListWithIndexedEvents[0]
|
||||
def chInitialOutputProcessed = chInitialOutput
|
||||
| map { tuple ->
|
||||
def channelId = tuple[0]
|
||||
def id_ = tuple[1]
|
||||
def output_ = tuple[2]
|
||||
|
||||
// see if output map contains metadata
|
||||
def meta_ =
|
||||
@@ -2737,19 +2895,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
output_ = output_.findAll{k, v -> k != "_meta"}
|
||||
|
||||
// check value types
|
||||
output_ = _processOutputValues(output_, meta.config, id_, key_)
|
||||
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && output_.size() == 1) {
|
||||
output_ = output_.values()[0]
|
||||
}
|
||||
|
||||
[join_id, id_, output_]
|
||||
[join_id, channelId, id_, output_]
|
||||
}
|
||||
// | view{"chInitialOutput: ${it.take(3)}"}
|
||||
|
||||
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
def new_state = workflowArgs.toState(tup.drop(2).take(3))
|
||||
tup.take(3) + [new_state] + tup.drop(5)
|
||||
}
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublishFiles = chPublishWithPreviousState
|
||||
// input tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(4)
|
||||
}
|
||||
|
||||
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(2).take(3)
|
||||
}
|
||||
| publishFilesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
// Join the state from the events that were emitted from different channels
|
||||
def chJoined = chInitialOutputProcessed
|
||||
| map {tuple ->
|
||||
def join_id = tuple[0]
|
||||
def channel_id = tuple[1]
|
||||
def id = tuple[2]
|
||||
def other = tuple.drop(3)
|
||||
// Below, groupTuple is used to join the events. To make sure resuming a workflow
|
||||
// keeps working, the output state must be deterministic. This means the state needs to be
|
||||
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
|
||||
// but hashing the state when it is large can be problematic in terms of performance.
|
||||
// Therefore, a custom comparator function is provided. We add the channel ID to the
|
||||
// states so that we can use the channel ID to sort the items.
|
||||
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
|
||||
// A comparator that is provided to groupTuple's 'sort' argument is applied
|
||||
// to all elements of the event tuple (that is not the 'id'). The comparator
|
||||
// closure that is used below expects the input to be List. So the join_id and
|
||||
// channel_id must also be wrapped in a list.
|
||||
[[join_id], [channel_id], id] + stateWithChannelID
|
||||
}
|
||||
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
|
||||
| map {join_ids, _, id, statesWithChannelID ->
|
||||
// Remove the channel IDs from the states
|
||||
def states = statesWithChannelID.collect{it[1]}
|
||||
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
|
||||
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
|
||||
def newJoinIdUnique = newJoinId[0]
|
||||
|
||||
// Merge the states from the different channels
|
||||
def newState = states.inject([:]){ old_state, state_to_add ->
|
||||
return old_state + state_to_add.collectEntries{k, v ->
|
||||
if (!multipleArgs.contains(k)) {
|
||||
// if the key is not a multiple argument, we expect only one value
|
||||
if (old_state.containsKey(k)) {
|
||||
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
|
||||
}
|
||||
[k, v]
|
||||
} else {
|
||||
// if the key is a multiple argument, append the different values into one list
|
||||
def prevValue = old_state.getOrDefault(k, [])
|
||||
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
|
||||
[k, prevValueAsList + v]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
|
||||
newState = newState.values()[0]
|
||||
}
|
||||
|
||||
return [newJoinIdUnique, id, newState]
|
||||
}
|
||||
|
||||
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
|
||||
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
@@ -2758,23 +2991,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublish = chNewState
|
||||
def chPublishStates = chNewState
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(3)
|
||||
}
|
||||
|
||||
safeJoin(chPublish, chArgsWithDefaults, key_)
|
||||
safeJoin(chPublishStates, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(1).take(3)
|
||||
}
|
||||
}
|
||||
| publishStatesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
|
||||
// remove join_id and meta
|
||||
chReturn = chNewState
|
||||
| map { tup ->
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
@@ -3031,6 +3262,10 @@ meta = [
|
||||
}
|
||||
],
|
||||
"status" : "enabled",
|
||||
"scope" : {
|
||||
"image" : "public",
|
||||
"target" : "public"
|
||||
},
|
||||
"requirements" : {
|
||||
"commands" : [
|
||||
"ps"
|
||||
@@ -3169,16 +3404,16 @@ meta = [
|
||||
"runner" : "nextflow",
|
||||
"engine" : "docker|native",
|
||||
"output" : "target/nextflow/falco",
|
||||
"viash_version" : "0.9.0",
|
||||
"git_commit" : "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b",
|
||||
"git_remote" : "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-27-g952ff08"
|
||||
"viash_version" : "0.9.2",
|
||||
"git_commit" : "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde",
|
||||
"git_remote" : "https://github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-30-g5f6516e"
|
||||
},
|
||||
"package_config" : {
|
||||
"name" : "biobox",
|
||||
"version" : "main",
|
||||
"description" : "A collection of bioinformatics tools for working with sequence data.\n",
|
||||
"viash_version" : "0.9.0",
|
||||
"viash_version" : "0.9.2",
|
||||
"source" : "src",
|
||||
"target" : "target",
|
||||
"config_mods" : [
|
||||
|
||||
@@ -357,6 +357,9 @@ info:
|
||||
doi: "10.1093/bioinformatics/btw354"
|
||||
licence: "GPL v3 or later"
|
||||
status: "enabled"
|
||||
scope:
|
||||
image: "public"
|
||||
target: "public"
|
||||
requirements:
|
||||
commands:
|
||||
- "ps"
|
||||
@@ -455,16 +458,16 @@ build_info:
|
||||
engine: "docker|native"
|
||||
output: "target/nextflow/multiqc"
|
||||
executable: "target/nextflow/multiqc/main.nf"
|
||||
viash_version: "0.9.0"
|
||||
git_commit: "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b"
|
||||
git_remote: "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-27-g952ff08"
|
||||
viash_version: "0.9.2"
|
||||
git_commit: "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde"
|
||||
git_remote: "https://github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-30-g5f6516e"
|
||||
package_config:
|
||||
name: "biobox"
|
||||
version: "main"
|
||||
description: "A collection of bioinformatics tools for working with sequence data.\n"
|
||||
info: null
|
||||
viash_version: "0.9.0"
|
||||
viash_version: "0.9.2"
|
||||
source: "src"
|
||||
target: "target"
|
||||
config_mods:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// multiqc main
|
||||
//
|
||||
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
|
||||
// This wrapper script is auto-generated by viash 0.9.2 and is thus a derivative
|
||||
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
|
||||
// Intuitive.
|
||||
//
|
||||
@@ -176,7 +176,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi
|
||||
Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.required) {
|
||||
if (arg.required && arg.direction == "input") {
|
||||
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
|
||||
}
|
||||
@@ -195,15 +195,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
}
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
|
||||
Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
|
||||
outputs = outputs.collectEntries { name, value ->
|
||||
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
|
||||
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
|
||||
@@ -216,6 +209,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
return outputs
|
||||
}
|
||||
|
||||
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
|
||||
class IDChecker {
|
||||
final def items = [] as Set
|
||||
@@ -1669,6 +1672,162 @@ def joinStates(Closure apply_) {
|
||||
}
|
||||
return joinStatesWf
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
|
||||
def publishFiles(Map args) {
|
||||
def key_ = args.get("key")
|
||||
|
||||
assert key_ != null : "publishFiles: key must be specified"
|
||||
|
||||
workflow publishFilesWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1]
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
[id_, inputFiles_, outputFilenames_]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesWf
|
||||
}
|
||||
|
||||
process publishFilesProc {
|
||||
// todo: check publishpath?
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
output:
|
||||
tuple val(id), path{outputFiles}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
// this assumes that the state contains no other values other than those specified in the config
|
||||
def publishFilesByConfig(Map args) {
|
||||
def config = args.get("config")
|
||||
assert config != null : "publishFilesByConfig: config must be specified"
|
||||
|
||||
def key_ = args.get("key", config.name)
|
||||
assert key_ != null : "publishFilesByConfig: key must be specified"
|
||||
|
||||
workflow publishFilesSimpleWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
|
||||
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
|
||||
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
.collectMany { par ->
|
||||
def plainName_ = par.plainName
|
||||
// if the state does not contain the key, it's an
|
||||
// optional argument for which the component did
|
||||
// not generate any output OR multiple channels were emitted
|
||||
// and the output was just not added to using the channel
|
||||
// that is now being parsed
|
||||
if (!state_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def value = state_[plainName_]
|
||||
// if the parameter is not a file, it should be stored
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[inputPath: [], outputFilename: []]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
// that it should not be returned as a state
|
||||
if (!origState_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def filenameTemplate = origState_[plainName_]
|
||||
// if the pararameter is multiple: true, fetch the template
|
||||
if (par.multiple && filenameTemplate instanceof List) {
|
||||
filenameTemplate = filenameTemplate[0]
|
||||
}
|
||||
// instantiate the template
|
||||
def filename = filenameTemplate
|
||||
.replaceAll('\\$id', id_)
|
||||
.replaceAll('\\$\\{id\\}', id_)
|
||||
.replaceAll('\\$key', key_)
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
if (par.multiple) {
|
||||
// if the parameter is multiple: true, the filename
|
||||
// should contain a wildcard '*' that is replaced with
|
||||
// the index of the file
|
||||
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
|
||||
def outputPerFile = value.withIndex().collect{ val, ix ->
|
||||
def filename_ix = filename.replace("*", ix.toString())
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[inputPath: inputPath, outputFilename: filename_ix]
|
||||
}
|
||||
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[inputPath: [inputPath], outputFilename: [filename]]]
|
||||
}
|
||||
}
|
||||
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
|
||||
[id_, inputPaths, outputFilenames]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesSimpleWf
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
|
||||
def collectFiles(obj) {
|
||||
if (obj instanceof java.io.File || obj instanceof Path) {
|
||||
@@ -1726,8 +1885,6 @@ def publishStates(Map args) {
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
def yamlFilename = yamlTemplate_
|
||||
.replaceAll('\\$id', id_)
|
||||
@@ -1740,7 +1897,7 @@ def publishStates(Map args) {
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -1752,33 +1909,17 @@ process publishStatesProc {
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
tuple val(id), val(yamlBlob), val(yamlFile)
|
||||
output:
|
||||
tuple val(id), path{[yamlFile] + outputFiles}
|
||||
tuple val(id), path{[yamlFile]}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
echo '${yamlBlob}' > '${yamlFile}'
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
cat > '${yamlFile}' << HERE
|
||||
${yamlBlob}
|
||||
HERE
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
@@ -1809,13 +1950,10 @@ def publishStatesByConfig(Map args) {
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// the processed state is a list of [key, value] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (key, value) are the tuples that will be saved to the state.yaml file
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
@@ -1832,7 +1970,7 @@ def publishStatesByConfig(Map args) {
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[key: plainName_, value: value, inputPath: [], outputFilename: []]]
|
||||
return [[key: plainName_, value: value]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
@@ -1863,13 +2001,9 @@ def publishStatesByConfig(Map args) {
|
||||
if (yamlDir != null) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[value: value_, inputPath: inputPath, outputFilename: filename_ix]
|
||||
return value_
|
||||
}
|
||||
def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
return [["key": plainName_, "value": outputPerFile]]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
// if id contains a slash
|
||||
@@ -1877,18 +2011,17 @@ def publishStatesByConfig(Map args) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]]
|
||||
return [["key": plainName_, value: value_]]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -2562,7 +2695,8 @@ def _debug(workflowArgs, debugKey) {
|
||||
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
|
||||
def key_ = workflowArgs["key"]
|
||||
|
||||
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
|
||||
|
||||
workflow workflowInstance {
|
||||
take: input_
|
||||
|
||||
@@ -2719,12 +2853,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
|
||||
def chInitialOutput = chArgsWithDefaults
|
||||
def chInitialOutputMulti = chArgsWithDefaults
|
||||
| _debug(workflowArgs, "processed")
|
||||
// run workflow
|
||||
| innerWorkflowFactory(workflowArgs)
|
||||
// check output tuple
|
||||
| map { id_, output_ ->
|
||||
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
|
||||
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
|
||||
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
|
||||
// This number is used to sort the events later when the events are gathered from across the channels.
|
||||
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
|
||||
def newChannel = channel
|
||||
| map {tuple ->
|
||||
assert tuple instanceof List :
|
||||
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
|
||||
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
|
||||
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
|
||||
|
||||
def newEvent = [channelIndex] + tuple
|
||||
return newEvent
|
||||
}
|
||||
return newChannel
|
||||
}
|
||||
// Put the events into 1 channel, cover case where there is only one channel is emitted
|
||||
def chInitialOutput = chInitialOutputList.size() > 1 ? \
|
||||
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
|
||||
chInitialOutputListWithIndexedEvents[0]
|
||||
def chInitialOutputProcessed = chInitialOutput
|
||||
| map { tuple ->
|
||||
def channelId = tuple[0]
|
||||
def id_ = tuple[1]
|
||||
def output_ = tuple[2]
|
||||
|
||||
// see if output map contains metadata
|
||||
def meta_ =
|
||||
@@ -2737,19 +2895,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
output_ = output_.findAll{k, v -> k != "_meta"}
|
||||
|
||||
// check value types
|
||||
output_ = _processOutputValues(output_, meta.config, id_, key_)
|
||||
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && output_.size() == 1) {
|
||||
output_ = output_.values()[0]
|
||||
}
|
||||
|
||||
[join_id, id_, output_]
|
||||
[join_id, channelId, id_, output_]
|
||||
}
|
||||
// | view{"chInitialOutput: ${it.take(3)}"}
|
||||
|
||||
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
def new_state = workflowArgs.toState(tup.drop(2).take(3))
|
||||
tup.take(3) + [new_state] + tup.drop(5)
|
||||
}
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublishFiles = chPublishWithPreviousState
|
||||
// input tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(4)
|
||||
}
|
||||
|
||||
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(2).take(3)
|
||||
}
|
||||
| publishFilesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
// Join the state from the events that were emitted from different channels
|
||||
def chJoined = chInitialOutputProcessed
|
||||
| map {tuple ->
|
||||
def join_id = tuple[0]
|
||||
def channel_id = tuple[1]
|
||||
def id = tuple[2]
|
||||
def other = tuple.drop(3)
|
||||
// Below, groupTuple is used to join the events. To make sure resuming a workflow
|
||||
// keeps working, the output state must be deterministic. This means the state needs to be
|
||||
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
|
||||
// but hashing the state when it is large can be problematic in terms of performance.
|
||||
// Therefore, a custom comparator function is provided. We add the channel ID to the
|
||||
// states so that we can use the channel ID to sort the items.
|
||||
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
|
||||
// A comparator that is provided to groupTuple's 'sort' argument is applied
|
||||
// to all elements of the event tuple (that is not the 'id'). The comparator
|
||||
// closure that is used below expects the input to be List. So the join_id and
|
||||
// channel_id must also be wrapped in a list.
|
||||
[[join_id], [channel_id], id] + stateWithChannelID
|
||||
}
|
||||
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
|
||||
| map {join_ids, _, id, statesWithChannelID ->
|
||||
// Remove the channel IDs from the states
|
||||
def states = statesWithChannelID.collect{it[1]}
|
||||
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
|
||||
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
|
||||
def newJoinIdUnique = newJoinId[0]
|
||||
|
||||
// Merge the states from the different channels
|
||||
def newState = states.inject([:]){ old_state, state_to_add ->
|
||||
return old_state + state_to_add.collectEntries{k, v ->
|
||||
if (!multipleArgs.contains(k)) {
|
||||
// if the key is not a multiple argument, we expect only one value
|
||||
if (old_state.containsKey(k)) {
|
||||
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
|
||||
}
|
||||
[k, v]
|
||||
} else {
|
||||
// if the key is a multiple argument, append the different values into one list
|
||||
def prevValue = old_state.getOrDefault(k, [])
|
||||
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
|
||||
[k, prevValueAsList + v]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
|
||||
newState = newState.values()[0]
|
||||
}
|
||||
|
||||
return [newJoinIdUnique, id, newState]
|
||||
}
|
||||
|
||||
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
|
||||
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
@@ -2758,23 +2991,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublish = chNewState
|
||||
def chPublishStates = chNewState
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(3)
|
||||
}
|
||||
|
||||
safeJoin(chPublish, chArgsWithDefaults, key_)
|
||||
safeJoin(chPublishStates, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(1).take(3)
|
||||
}
|
||||
}
|
||||
| publishStatesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
|
||||
// remove join_id and meta
|
||||
chReturn = chNewState
|
||||
| map { tup ->
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
@@ -3246,6 +3477,10 @@ meta = [
|
||||
"licence" : "GPL v3 or later"
|
||||
},
|
||||
"status" : "enabled",
|
||||
"scope" : {
|
||||
"image" : "public",
|
||||
"target" : "public"
|
||||
},
|
||||
"requirements" : {
|
||||
"commands" : [
|
||||
"ps"
|
||||
@@ -3365,16 +3600,16 @@ meta = [
|
||||
"runner" : "nextflow",
|
||||
"engine" : "docker|native",
|
||||
"output" : "target/nextflow/multiqc",
|
||||
"viash_version" : "0.9.0",
|
||||
"git_commit" : "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b",
|
||||
"git_remote" : "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-27-g952ff08"
|
||||
"viash_version" : "0.9.2",
|
||||
"git_commit" : "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde",
|
||||
"git_remote" : "https://github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-30-g5f6516e"
|
||||
},
|
||||
"package_config" : {
|
||||
"name" : "biobox",
|
||||
"version" : "main",
|
||||
"description" : "A collection of bioinformatics tools for working with sequence data.\n",
|
||||
"viash_version" : "0.9.0",
|
||||
"viash_version" : "0.9.2",
|
||||
"source" : "src",
|
||||
"target" : "target",
|
||||
"config_mods" : [
|
||||
|
||||
@@ -295,6 +295,9 @@ test_resources:
|
||||
path: "test_data"
|
||||
info: null
|
||||
status: "enabled"
|
||||
scope:
|
||||
image: "public"
|
||||
target: "public"
|
||||
requirements:
|
||||
commands:
|
||||
- "ps"
|
||||
@@ -400,16 +403,16 @@ build_info:
|
||||
engine: "docker|native"
|
||||
output: "target/nextflow/samtools/samtools_stats"
|
||||
executable: "target/nextflow/samtools/samtools_stats/main.nf"
|
||||
viash_version: "0.9.0"
|
||||
git_commit: "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b"
|
||||
git_remote: "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-27-g952ff08"
|
||||
viash_version: "0.9.2"
|
||||
git_commit: "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde"
|
||||
git_remote: "https://github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-30-g5f6516e"
|
||||
package_config:
|
||||
name: "biobox"
|
||||
version: "main"
|
||||
description: "A collection of bioinformatics tools for working with sequence data.\n"
|
||||
info: null
|
||||
viash_version: "0.9.0"
|
||||
viash_version: "0.9.2"
|
||||
source: "src"
|
||||
target: "target"
|
||||
config_mods:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// samtools_stats main
|
||||
//
|
||||
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
|
||||
// This wrapper script is auto-generated by viash 0.9.2 and is thus a derivative
|
||||
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
|
||||
// Intuitive.
|
||||
//
|
||||
@@ -176,7 +176,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi
|
||||
Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.required) {
|
||||
if (arg.required && arg.direction == "input") {
|
||||
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
|
||||
}
|
||||
@@ -195,15 +195,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
}
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
|
||||
Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
|
||||
outputs = outputs.collectEntries { name, value ->
|
||||
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
|
||||
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
|
||||
@@ -216,6 +209,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
return outputs
|
||||
}
|
||||
|
||||
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
|
||||
class IDChecker {
|
||||
final def items = [] as Set
|
||||
@@ -1669,6 +1672,162 @@ def joinStates(Closure apply_) {
|
||||
}
|
||||
return joinStatesWf
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
|
||||
def publishFiles(Map args) {
|
||||
def key_ = args.get("key")
|
||||
|
||||
assert key_ != null : "publishFiles: key must be specified"
|
||||
|
||||
workflow publishFilesWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1]
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
[id_, inputFiles_, outputFilenames_]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesWf
|
||||
}
|
||||
|
||||
process publishFilesProc {
|
||||
// todo: check publishpath?
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
output:
|
||||
tuple val(id), path{outputFiles}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
// this assumes that the state contains no other values other than those specified in the config
|
||||
def publishFilesByConfig(Map args) {
|
||||
def config = args.get("config")
|
||||
assert config != null : "publishFilesByConfig: config must be specified"
|
||||
|
||||
def key_ = args.get("key", config.name)
|
||||
assert key_ != null : "publishFilesByConfig: key must be specified"
|
||||
|
||||
workflow publishFilesSimpleWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
|
||||
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
|
||||
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
.collectMany { par ->
|
||||
def plainName_ = par.plainName
|
||||
// if the state does not contain the key, it's an
|
||||
// optional argument for which the component did
|
||||
// not generate any output OR multiple channels were emitted
|
||||
// and the output was just not added to using the channel
|
||||
// that is now being parsed
|
||||
if (!state_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def value = state_[plainName_]
|
||||
// if the parameter is not a file, it should be stored
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[inputPath: [], outputFilename: []]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
// that it should not be returned as a state
|
||||
if (!origState_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def filenameTemplate = origState_[plainName_]
|
||||
// if the pararameter is multiple: true, fetch the template
|
||||
if (par.multiple && filenameTemplate instanceof List) {
|
||||
filenameTemplate = filenameTemplate[0]
|
||||
}
|
||||
// instantiate the template
|
||||
def filename = filenameTemplate
|
||||
.replaceAll('\\$id', id_)
|
||||
.replaceAll('\\$\\{id\\}', id_)
|
||||
.replaceAll('\\$key', key_)
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
if (par.multiple) {
|
||||
// if the parameter is multiple: true, the filename
|
||||
// should contain a wildcard '*' that is replaced with
|
||||
// the index of the file
|
||||
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
|
||||
def outputPerFile = value.withIndex().collect{ val, ix ->
|
||||
def filename_ix = filename.replace("*", ix.toString())
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[inputPath: inputPath, outputFilename: filename_ix]
|
||||
}
|
||||
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[inputPath: [inputPath], outputFilename: [filename]]]
|
||||
}
|
||||
}
|
||||
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
|
||||
[id_, inputPaths, outputFilenames]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesSimpleWf
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
|
||||
def collectFiles(obj) {
|
||||
if (obj instanceof java.io.File || obj instanceof Path) {
|
||||
@@ -1726,8 +1885,6 @@ def publishStates(Map args) {
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
def yamlFilename = yamlTemplate_
|
||||
.replaceAll('\\$id', id_)
|
||||
@@ -1740,7 +1897,7 @@ def publishStates(Map args) {
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -1752,33 +1909,17 @@ process publishStatesProc {
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
tuple val(id), val(yamlBlob), val(yamlFile)
|
||||
output:
|
||||
tuple val(id), path{[yamlFile] + outputFiles}
|
||||
tuple val(id), path{[yamlFile]}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
echo '${yamlBlob}' > '${yamlFile}'
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
cat > '${yamlFile}' << HERE
|
||||
${yamlBlob}
|
||||
HERE
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
@@ -1809,13 +1950,10 @@ def publishStatesByConfig(Map args) {
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// the processed state is a list of [key, value] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (key, value) are the tuples that will be saved to the state.yaml file
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
@@ -1832,7 +1970,7 @@ def publishStatesByConfig(Map args) {
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[key: plainName_, value: value, inputPath: [], outputFilename: []]]
|
||||
return [[key: plainName_, value: value]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
@@ -1863,13 +2001,9 @@ def publishStatesByConfig(Map args) {
|
||||
if (yamlDir != null) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[value: value_, inputPath: inputPath, outputFilename: filename_ix]
|
||||
return value_
|
||||
}
|
||||
def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
return [["key": plainName_, "value": outputPerFile]]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
// if id contains a slash
|
||||
@@ -1877,18 +2011,17 @@ def publishStatesByConfig(Map args) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]]
|
||||
return [["key": plainName_, value: value_]]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -2562,7 +2695,8 @@ def _debug(workflowArgs, debugKey) {
|
||||
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
|
||||
def key_ = workflowArgs["key"]
|
||||
|
||||
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
|
||||
|
||||
workflow workflowInstance {
|
||||
take: input_
|
||||
|
||||
@@ -2719,12 +2853,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
|
||||
def chInitialOutput = chArgsWithDefaults
|
||||
def chInitialOutputMulti = chArgsWithDefaults
|
||||
| _debug(workflowArgs, "processed")
|
||||
// run workflow
|
||||
| innerWorkflowFactory(workflowArgs)
|
||||
// check output tuple
|
||||
| map { id_, output_ ->
|
||||
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
|
||||
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
|
||||
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
|
||||
// This number is used to sort the events later when the events are gathered from across the channels.
|
||||
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
|
||||
def newChannel = channel
|
||||
| map {tuple ->
|
||||
assert tuple instanceof List :
|
||||
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
|
||||
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
|
||||
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
|
||||
|
||||
def newEvent = [channelIndex] + tuple
|
||||
return newEvent
|
||||
}
|
||||
return newChannel
|
||||
}
|
||||
// Put the events into 1 channel, cover case where there is only one channel is emitted
|
||||
def chInitialOutput = chInitialOutputList.size() > 1 ? \
|
||||
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
|
||||
chInitialOutputListWithIndexedEvents[0]
|
||||
def chInitialOutputProcessed = chInitialOutput
|
||||
| map { tuple ->
|
||||
def channelId = tuple[0]
|
||||
def id_ = tuple[1]
|
||||
def output_ = tuple[2]
|
||||
|
||||
// see if output map contains metadata
|
||||
def meta_ =
|
||||
@@ -2737,19 +2895,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
output_ = output_.findAll{k, v -> k != "_meta"}
|
||||
|
||||
// check value types
|
||||
output_ = _processOutputValues(output_, meta.config, id_, key_)
|
||||
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && output_.size() == 1) {
|
||||
output_ = output_.values()[0]
|
||||
}
|
||||
|
||||
[join_id, id_, output_]
|
||||
[join_id, channelId, id_, output_]
|
||||
}
|
||||
// | view{"chInitialOutput: ${it.take(3)}"}
|
||||
|
||||
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
def new_state = workflowArgs.toState(tup.drop(2).take(3))
|
||||
tup.take(3) + [new_state] + tup.drop(5)
|
||||
}
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublishFiles = chPublishWithPreviousState
|
||||
// input tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(4)
|
||||
}
|
||||
|
||||
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(2).take(3)
|
||||
}
|
||||
| publishFilesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
// Join the state from the events that were emitted from different channels
|
||||
def chJoined = chInitialOutputProcessed
|
||||
| map {tuple ->
|
||||
def join_id = tuple[0]
|
||||
def channel_id = tuple[1]
|
||||
def id = tuple[2]
|
||||
def other = tuple.drop(3)
|
||||
// Below, groupTuple is used to join the events. To make sure resuming a workflow
|
||||
// keeps working, the output state must be deterministic. This means the state needs to be
|
||||
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
|
||||
// but hashing the state when it is large can be problematic in terms of performance.
|
||||
// Therefore, a custom comparator function is provided. We add the channel ID to the
|
||||
// states so that we can use the channel ID to sort the items.
|
||||
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
|
||||
// A comparator that is provided to groupTuple's 'sort' argument is applied
|
||||
// to all elements of the event tuple (that is not the 'id'). The comparator
|
||||
// closure that is used below expects the input to be List. So the join_id and
|
||||
// channel_id must also be wrapped in a list.
|
||||
[[join_id], [channel_id], id] + stateWithChannelID
|
||||
}
|
||||
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
|
||||
| map {join_ids, _, id, statesWithChannelID ->
|
||||
// Remove the channel IDs from the states
|
||||
def states = statesWithChannelID.collect{it[1]}
|
||||
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
|
||||
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
|
||||
def newJoinIdUnique = newJoinId[0]
|
||||
|
||||
// Merge the states from the different channels
|
||||
def newState = states.inject([:]){ old_state, state_to_add ->
|
||||
return old_state + state_to_add.collectEntries{k, v ->
|
||||
if (!multipleArgs.contains(k)) {
|
||||
// if the key is not a multiple argument, we expect only one value
|
||||
if (old_state.containsKey(k)) {
|
||||
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
|
||||
}
|
||||
[k, v]
|
||||
} else {
|
||||
// if the key is a multiple argument, append the different values into one list
|
||||
def prevValue = old_state.getOrDefault(k, [])
|
||||
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
|
||||
[k, prevValueAsList + v]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
|
||||
newState = newState.values()[0]
|
||||
}
|
||||
|
||||
return [newJoinIdUnique, id, newState]
|
||||
}
|
||||
|
||||
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
|
||||
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
@@ -2758,23 +2991,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublish = chNewState
|
||||
def chPublishStates = chNewState
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(3)
|
||||
}
|
||||
|
||||
safeJoin(chPublish, chArgsWithDefaults, key_)
|
||||
safeJoin(chPublishStates, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(1).take(3)
|
||||
}
|
||||
}
|
||||
| publishStatesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
|
||||
// remove join_id and meta
|
||||
chReturn = chNewState
|
||||
| map { tup ->
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
@@ -3169,6 +3400,10 @@ meta = [
|
||||
}
|
||||
],
|
||||
"status" : "enabled",
|
||||
"scope" : {
|
||||
"image" : "public",
|
||||
"target" : "public"
|
||||
},
|
||||
"requirements" : {
|
||||
"commands" : [
|
||||
"ps"
|
||||
@@ -3294,16 +3529,16 @@ meta = [
|
||||
"runner" : "nextflow",
|
||||
"engine" : "docker|native",
|
||||
"output" : "target/nextflow/samtools/samtools_stats",
|
||||
"viash_version" : "0.9.0",
|
||||
"git_commit" : "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b",
|
||||
"git_remote" : "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-27-g952ff08"
|
||||
"viash_version" : "0.9.2",
|
||||
"git_commit" : "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde",
|
||||
"git_remote" : "https://github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-30-g5f6516e"
|
||||
},
|
||||
"package_config" : {
|
||||
"name" : "biobox",
|
||||
"version" : "main",
|
||||
"description" : "A collection of bioinformatics tools for working with sequence data.\n",
|
||||
"viash_version" : "0.9.0",
|
||||
"viash_version" : "0.9.2",
|
||||
"source" : "src",
|
||||
"target" : "target",
|
||||
"config_mods" : [
|
||||
|
||||
@@ -2540,6 +2540,9 @@ test_resources:
|
||||
is_executable: true
|
||||
info: null
|
||||
status: "enabled"
|
||||
scope:
|
||||
image: "public"
|
||||
target: "public"
|
||||
requirements:
|
||||
commands:
|
||||
- "ps"
|
||||
@@ -2662,16 +2665,16 @@ build_info:
|
||||
engine: "docker|native"
|
||||
output: "target/nextflow/star/star_align_reads"
|
||||
executable: "target/nextflow/star/star_align_reads/main.nf"
|
||||
viash_version: "0.9.0"
|
||||
git_commit: "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b"
|
||||
git_remote: "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-27-g952ff08"
|
||||
viash_version: "0.9.2"
|
||||
git_commit: "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde"
|
||||
git_remote: "https://github.com/viash-hub/biobox"
|
||||
git_tag: "v0.2.0-30-g5f6516e"
|
||||
package_config:
|
||||
name: "biobox"
|
||||
version: "main"
|
||||
description: "A collection of bioinformatics tools for working with sequence data.\n"
|
||||
info: null
|
||||
viash_version: "0.9.0"
|
||||
viash_version: "0.9.2"
|
||||
source: "src"
|
||||
target: "target"
|
||||
config_mods:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// star_align_reads main
|
||||
//
|
||||
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
|
||||
// This wrapper script is auto-generated by viash 0.9.2 and is thus a derivative
|
||||
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
|
||||
// Intuitive.
|
||||
//
|
||||
@@ -177,7 +177,7 @@ def _checkArgumentType(String stage, Map par, Object value, String errorIdentifi
|
||||
Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.required) {
|
||||
if (arg.required && arg.direction == "input") {
|
||||
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
|
||||
}
|
||||
@@ -196,15 +196,8 @@ Map _processInputValues(Map inputs, Map config, String id, String key) {
|
||||
}
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
|
||||
Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
|
||||
outputs = outputs.collectEntries { name, value ->
|
||||
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
|
||||
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
|
||||
@@ -217,6 +210,16 @@ Map _processOutputValues(Map outputs, Map config, String id, String key) {
|
||||
return outputs
|
||||
}
|
||||
|
||||
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
|
||||
if (!workflow.stubRun) {
|
||||
config.allArguments.each { arg ->
|
||||
if (arg.direction == "output" && arg.required) {
|
||||
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
|
||||
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
|
||||
class IDChecker {
|
||||
final def items = [] as Set
|
||||
@@ -1670,6 +1673,162 @@ def joinStates(Closure apply_) {
|
||||
}
|
||||
return joinStatesWf
|
||||
}
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
|
||||
def publishFiles(Map args) {
|
||||
def key_ = args.get("key")
|
||||
|
||||
assert key_ != null : "publishFiles: key must be specified"
|
||||
|
||||
workflow publishFilesWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1]
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
[id_, inputFiles_, outputFilenames_]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesWf
|
||||
}
|
||||
|
||||
process publishFilesProc {
|
||||
// todo: check publishpath?
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
output:
|
||||
tuple val(id), path{outputFiles}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
// this assumes that the state contains no other values other than those specified in the config
|
||||
def publishFilesByConfig(Map args) {
|
||||
def config = args.get("config")
|
||||
assert config != null : "publishFilesByConfig: config must be specified"
|
||||
|
||||
def key_ = args.get("key", config.name)
|
||||
assert key_ != null : "publishFilesByConfig: key must be specified"
|
||||
|
||||
workflow publishFilesSimpleWf {
|
||||
take: input_ch
|
||||
main:
|
||||
input_ch
|
||||
| map { tup ->
|
||||
def id_ = tup[0]
|
||||
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
|
||||
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
|
||||
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
.collectMany { par ->
|
||||
def plainName_ = par.plainName
|
||||
// if the state does not contain the key, it's an
|
||||
// optional argument for which the component did
|
||||
// not generate any output OR multiple channels were emitted
|
||||
// and the output was just not added to using the channel
|
||||
// that is now being parsed
|
||||
if (!state_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def value = state_[plainName_]
|
||||
// if the parameter is not a file, it should be stored
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[inputPath: [], outputFilename: []]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
// that it should not be returned as a state
|
||||
if (!origState_.containsKey(plainName_)) {
|
||||
return []
|
||||
}
|
||||
def filenameTemplate = origState_[plainName_]
|
||||
// if the pararameter is multiple: true, fetch the template
|
||||
if (par.multiple && filenameTemplate instanceof List) {
|
||||
filenameTemplate = filenameTemplate[0]
|
||||
}
|
||||
// instantiate the template
|
||||
def filename = filenameTemplate
|
||||
.replaceAll('\\$id', id_)
|
||||
.replaceAll('\\$\\{id\\}', id_)
|
||||
.replaceAll('\\$key', key_)
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
if (par.multiple) {
|
||||
// if the parameter is multiple: true, the filename
|
||||
// should contain a wildcard '*' that is replaced with
|
||||
// the index of the file
|
||||
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
|
||||
def outputPerFile = value.withIndex().collect{ val, ix ->
|
||||
def filename_ix = filename.replace("*", ix.toString())
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[inputPath: inputPath, outputFilename: filename_ix]
|
||||
}
|
||||
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[inputPath: [inputPath], outputFilename: [filename]]]
|
||||
}
|
||||
}
|
||||
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
|
||||
[id_, inputPaths, outputFilenames]
|
||||
}
|
||||
| publishFilesProc
|
||||
emit: input_ch
|
||||
}
|
||||
return publishFilesSimpleWf
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
|
||||
def collectFiles(obj) {
|
||||
if (obj instanceof java.io.File || obj instanceof Path) {
|
||||
@@ -1727,8 +1886,6 @@ def publishStates(Map args) {
|
||||
|
||||
// the input files and the target output filenames
|
||||
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
|
||||
def inputFiles_ = inputoutputFilenames_[0]
|
||||
def outputFilenames_ = inputoutputFilenames_[1]
|
||||
|
||||
def yamlFilename = yamlTemplate_
|
||||
.replaceAll('\\$id', id_)
|
||||
@@ -1741,7 +1898,7 @@ def publishStates(Map args) {
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputFiles_, outputFilenames_]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -1753,33 +1910,17 @@ process publishStatesProc {
|
||||
publishDir path: "${getPublishDir()}/", mode: "copy"
|
||||
tag "$id"
|
||||
input:
|
||||
tuple val(id), val(yamlBlob), val(yamlFile), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
|
||||
tuple val(id), val(yamlBlob), val(yamlFile)
|
||||
output:
|
||||
tuple val(id), path{[yamlFile] + outputFiles}
|
||||
tuple val(id), path{[yamlFile]}
|
||||
script:
|
||||
def copyCommands = [
|
||||
inputFiles instanceof List ? inputFiles : [inputFiles],
|
||||
outputFiles instanceof List ? outputFiles : [outputFiles]
|
||||
]
|
||||
.transpose()
|
||||
.collectMany{infile, outfile ->
|
||||
if (infile.toString() != outfile.toString()) {
|
||||
[
|
||||
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
|
||||
"cp -r '${infile.toString()}' '${outfile.toString()}'"
|
||||
]
|
||||
} else {
|
||||
// no need to copy if infile is the same as outfile
|
||||
[]
|
||||
}
|
||||
}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
echo '${yamlBlob}' > '${yamlFile}'
|
||||
echo "Copying output files to destination folder"
|
||||
${copyCommands.join("\n ")}
|
||||
"""
|
||||
mkdir -p "\$(dirname '${yamlFile}')"
|
||||
echo "Storing state as yaml"
|
||||
cat > '${yamlFile}' << HERE
|
||||
${yamlBlob}
|
||||
HERE
|
||||
"""
|
||||
}
|
||||
|
||||
|
||||
@@ -1810,13 +1951,10 @@ def publishStatesByConfig(Map args) {
|
||||
.replaceAll('\\$\\{key\\}', key_)
|
||||
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
|
||||
|
||||
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
|
||||
// the processed state is a list of [key, value] tuples, where
|
||||
// - key is a String
|
||||
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
|
||||
// - inputPath is a List[Path]
|
||||
// - outputFilename is a List[String]
|
||||
// - (key, value) are the tuples that will be saved to the state.yaml file
|
||||
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
|
||||
def processedState =
|
||||
config.allArguments
|
||||
.findAll { it.direction == "output" }
|
||||
@@ -1833,7 +1971,7 @@ def publishStatesByConfig(Map args) {
|
||||
// in the state as-is, but is not something that needs
|
||||
// to be copied from the source path to the dest path
|
||||
if (par.type != "file") {
|
||||
return [[key: plainName_, value: value, inputPath: [], outputFilename: []]]
|
||||
return [[key: plainName_, value: value]]
|
||||
}
|
||||
// if the orig state does not contain this filename,
|
||||
// it's an optional argument for which the user specified
|
||||
@@ -1864,13 +2002,9 @@ def publishStatesByConfig(Map args) {
|
||||
if (yamlDir != null) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = val instanceof File ? val.toPath() : val
|
||||
[value: value_, inputPath: inputPath, outputFilename: filename_ix]
|
||||
return value_
|
||||
}
|
||||
def transposedOutputs = ["value", "inputPath", "outputFilename"].collectEntries{ key ->
|
||||
[key, outputPerFile.collect{dic -> dic[key]}]
|
||||
}
|
||||
return [[key: plainName_] + transposedOutputs]
|
||||
return [["key": plainName_, "value": outputPerFile]]
|
||||
} else {
|
||||
def value_ = java.nio.file.Paths.get(filename)
|
||||
// if id contains a slash
|
||||
@@ -1878,18 +2012,17 @@ def publishStatesByConfig(Map args) {
|
||||
value_ = yamlDir.relativize(value_)
|
||||
}
|
||||
def inputPath = value instanceof File ? value.toPath() : value
|
||||
return [[key: plainName_, value: value_, inputPath: [inputPath], outputFilename: [filename]]]
|
||||
return [["key": plainName_, value: value_]]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
|
||||
def inputPaths = processedState.collectMany{it.inputPath}
|
||||
def outputFilenames = processedState.collectMany{it.outputFilename}
|
||||
|
||||
// convert state to yaml blob
|
||||
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
|
||||
|
||||
[id_, yamlBlob_, yamlFilename, inputPaths, outputFilenames]
|
||||
[id_, yamlBlob_, yamlFilename]
|
||||
}
|
||||
| publishStatesProc
|
||||
emit: input_ch
|
||||
@@ -2563,7 +2696,8 @@ def _debug(workflowArgs, debugKey) {
|
||||
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
|
||||
def key_ = workflowArgs["key"]
|
||||
|
||||
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
|
||||
|
||||
workflow workflowInstance {
|
||||
take: input_
|
||||
|
||||
@@ -2720,12 +2854,36 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
|
||||
def chInitialOutput = chArgsWithDefaults
|
||||
def chInitialOutputMulti = chArgsWithDefaults
|
||||
| _debug(workflowArgs, "processed")
|
||||
// run workflow
|
||||
| innerWorkflowFactory(workflowArgs)
|
||||
// check output tuple
|
||||
| map { id_, output_ ->
|
||||
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
|
||||
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
|
||||
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
|
||||
// This number is used to sort the events later when the events are gathered from across the channels.
|
||||
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
|
||||
def newChannel = channel
|
||||
| map {tuple ->
|
||||
assert tuple instanceof List :
|
||||
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
|
||||
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
|
||||
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
|
||||
|
||||
def newEvent = [channelIndex] + tuple
|
||||
return newEvent
|
||||
}
|
||||
return newChannel
|
||||
}
|
||||
// Put the events into 1 channel, cover case where there is only one channel is emitted
|
||||
def chInitialOutput = chInitialOutputList.size() > 1 ? \
|
||||
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
|
||||
chInitialOutputListWithIndexedEvents[0]
|
||||
def chInitialOutputProcessed = chInitialOutput
|
||||
| map { tuple ->
|
||||
def channelId = tuple[0]
|
||||
def id_ = tuple[1]
|
||||
def output_ = tuple[2]
|
||||
|
||||
// see if output map contains metadata
|
||||
def meta_ =
|
||||
@@ -2738,19 +2896,94 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
output_ = output_.findAll{k, v -> k != "_meta"}
|
||||
|
||||
// check value types
|
||||
output_ = _processOutputValues(output_, meta.config, id_, key_)
|
||||
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && output_.size() == 1) {
|
||||
output_ = output_.values()[0]
|
||||
}
|
||||
|
||||
[join_id, id_, output_]
|
||||
[join_id, channelId, id_, output_]
|
||||
}
|
||||
// | view{"chInitialOutput: ${it.take(3)}"}
|
||||
|
||||
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
def new_state = workflowArgs.toState(tup.drop(2).take(3))
|
||||
tup.take(3) + [new_state] + tup.drop(5)
|
||||
}
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublishFiles = chPublishWithPreviousState
|
||||
// input tuple format: [join_id, channel_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, channel_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(4)
|
||||
}
|
||||
|
||||
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(2).take(3)
|
||||
}
|
||||
| publishFilesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
// Join the state from the events that were emitted from different channels
|
||||
def chJoined = chInitialOutputProcessed
|
||||
| map {tuple ->
|
||||
def join_id = tuple[0]
|
||||
def channel_id = tuple[1]
|
||||
def id = tuple[2]
|
||||
def other = tuple.drop(3)
|
||||
// Below, groupTuple is used to join the events. To make sure resuming a workflow
|
||||
// keeps working, the output state must be deterministic. This means the state needs to be
|
||||
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
|
||||
// but hashing the state when it is large can be problematic in terms of performance.
|
||||
// Therefore, a custom comparator function is provided. We add the channel ID to the
|
||||
// states so that we can use the channel ID to sort the items.
|
||||
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
|
||||
// A comparator that is provided to groupTuple's 'sort' argument is applied
|
||||
// to all elements of the event tuple (that is not the 'id'). The comparator
|
||||
// closure that is used below expects the input to be List. So the join_id and
|
||||
// channel_id must also be wrapped in a list.
|
||||
[[join_id], [channel_id], id] + stateWithChannelID
|
||||
}
|
||||
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
|
||||
| map {join_ids, _, id, statesWithChannelID ->
|
||||
// Remove the channel IDs from the states
|
||||
def states = statesWithChannelID.collect{it[1]}
|
||||
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
|
||||
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
|
||||
def newJoinIdUnique = newJoinId[0]
|
||||
|
||||
// Merge the states from the different channels
|
||||
def newState = states.inject([:]){ old_state, state_to_add ->
|
||||
return old_state + state_to_add.collectEntries{k, v ->
|
||||
if (!multipleArgs.contains(k)) {
|
||||
// if the key is not a multiple argument, we expect only one value
|
||||
if (old_state.containsKey(k)) {
|
||||
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
|
||||
}
|
||||
[k, v]
|
||||
} else {
|
||||
// if the key is a multiple argument, append the different values into one list
|
||||
def prevValue = old_state.getOrDefault(k, [])
|
||||
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
|
||||
[k, prevValueAsList + v]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
|
||||
|
||||
// simplify output if need be
|
||||
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
|
||||
newState = newState.values()[0]
|
||||
}
|
||||
|
||||
return [newJoinIdUnique, id, newState]
|
||||
}
|
||||
|
||||
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
|
||||
def chNewState = safeJoin(chInitialOutput, chRunFiltered, key_)
|
||||
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
|
||||
// input tuple format: [join_id, id, output, prev_state, ...]
|
||||
// output tuple format: [join_id, id, new_state, ...]
|
||||
| map{ tup ->
|
||||
@@ -2759,23 +2992,21 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
|
||||
}
|
||||
|
||||
if (workflowArgs.auto.publish == "state") {
|
||||
def chPublish = chNewState
|
||||
def chPublishStates = chNewState
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
// output tuple format: [join_id, id, new_state]
|
||||
| map{ tup ->
|
||||
tup.take(3)
|
||||
}
|
||||
|
||||
safeJoin(chPublish, chArgsWithDefaults, key_)
|
||||
safeJoin(chPublishStates, chArgsWithDefaults, key_)
|
||||
// input tuple format: [join_id, id, new_state, orig_state, ...]
|
||||
// output tuple format: [id, new_state, orig_state]
|
||||
| map { tup ->
|
||||
tup.drop(1).take(3)
|
||||
}
|
||||
}
|
||||
| publishStatesByConfig(key: key_, config: meta.config)
|
||||
}
|
||||
|
||||
// remove join_id and meta
|
||||
chReturn = chNewState
|
||||
| map { tup ->
|
||||
// input tuple format: [join_id, id, new_state, ...]
|
||||
@@ -5794,6 +6025,10 @@ meta = [
|
||||
}
|
||||
],
|
||||
"status" : "enabled",
|
||||
"scope" : {
|
||||
"image" : "public",
|
||||
"target" : "public"
|
||||
},
|
||||
"requirements" : {
|
||||
"commands" : [
|
||||
"ps"
|
||||
@@ -5942,16 +6177,16 @@ meta = [
|
||||
"runner" : "nextflow",
|
||||
"engine" : "docker|native",
|
||||
"output" : "target/nextflow/star/star_align_reads",
|
||||
"viash_version" : "0.9.0",
|
||||
"git_commit" : "952ff0843093b538cbfd6fefdecf2e7a0bc9e70b",
|
||||
"git_remote" : "https://x-access-token:ghs_EwAUAMYJ0K4VBHlAEMs4ZP2OyQYqJM0PSfEO@github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-27-g952ff08"
|
||||
"viash_version" : "0.9.2",
|
||||
"git_commit" : "5f6516e9c0d95c84f3d4159a67d3de19d3ae1fde",
|
||||
"git_remote" : "https://github.com/viash-hub/biobox",
|
||||
"git_tag" : "v0.2.0-30-g5f6516e"
|
||||
},
|
||||
"package_config" : {
|
||||
"name" : "biobox",
|
||||
"version" : "main",
|
||||
"description" : "A collection of bioinformatics tools for working with sequence data.\n",
|
||||
"viash_version" : "0.9.0",
|
||||
"viash_version" : "0.9.2",
|
||||
"source" : "src",
|
||||
"target" : "target",
|
||||
"config_mods" : [
|
||||
|
||||
@@ -332,7 +332,7 @@ build_info:
|
||||
output: "target/nextflow/demultiplex_htrnaseq"
|
||||
executable: "target/nextflow/demultiplex_htrnaseq/main.nf"
|
||||
viash_version: "0.9.0-RC6"
|
||||
git_commit: "622c1117f5ad07eeea508dfcc0b831d3c71114fb"
|
||||
git_commit: "4dc3a874d291722154981aef8131c8048d2ac945"
|
||||
git_remote: "https://github.com/viash-hub/playground"
|
||||
dependencies:
|
||||
- "target/dependencies/vsh/vsh/demultiplex/v0.3.4/nextflow/demultiplex"
|
||||
|
||||
@@ -3182,7 +3182,7 @@ meta = [
|
||||
"engine" : "native",
|
||||
"output" : "target/nextflow/demultiplex_htrnaseq",
|
||||
"viash_version" : "0.9.0-RC6",
|
||||
"git_commit" : "622c1117f5ad07eeea508dfcc0b831d3c71114fb",
|
||||
"git_commit" : "4dc3a874d291722154981aef8131c8048d2ac945",
|
||||
"git_remote" : "https://github.com/viash-hub/playground"
|
||||
},
|
||||
"package_config" : {
|
||||
|
||||
@@ -167,7 +167,7 @@ build_info:
|
||||
output: "target/nextflow/mapping_and_qc"
|
||||
executable: "target/nextflow/mapping_and_qc/main.nf"
|
||||
viash_version: "0.9.0-RC6"
|
||||
git_commit: "622c1117f5ad07eeea508dfcc0b831d3c71114fb"
|
||||
git_commit: "4dc3a874d291722154981aef8131c8048d2ac945"
|
||||
git_remote: "https://github.com/viash-hub/playground"
|
||||
dependencies:
|
||||
- "target/dependencies/vsh/vsh/biobox/main/nextflow/cutadapt"
|
||||
|
||||
@@ -2997,7 +2997,7 @@ meta = [
|
||||
"engine" : "native|native",
|
||||
"output" : "target/nextflow/mapping_and_qc",
|
||||
"viash_version" : "0.9.0-RC6",
|
||||
"git_commit" : "622c1117f5ad07eeea508dfcc0b831d3c71114fb",
|
||||
"git_commit" : "4dc3a874d291722154981aef8131c8048d2ac945",
|
||||
"git_remote" : "https://github.com/viash-hub/playground"
|
||||
},
|
||||
"package_config" : {
|
||||
|
||||
Reference in New Issue
Block a user