Files
htrnaseq/target/nextflow/parallel_map/main.nf
CI e6f4877f17 Build branch main with version main (c7c8471)
Build pipeline: viash-hub.htrnaseq.main-dwqr2

Source commit: c7c84719b5

Source message: Update test resources (#47)
2025-05-14 08:50:22 +00:00

4290 lines
155 KiB
Plaintext

// parallel_map main
//
// This wrapper script is auto-generated by viash 0.9.4 and is thus a derivative
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
// Intuitive.
//
// The component may contain files which fall under a different license. The
// authors of this component should specify the license in the header of such
// files, or include a separate license file detailing the licenses of all included
// files.
//
// Component authors:
// * Dries Schaumont (maintainer)
// * Toni Verbeiren (author, maintainer)
////////////////////////////
// VDSL3 helper functions //
////////////////////////////
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_checkArgumentType.nf'
class UnexpectedArgumentTypeException extends Exception {
String errorIdentifier
String stage
String plainName
String expectedClass
String foundClass
// ${key ? " in module '$key'" : ""}${id ? " id '$id'" : ""}
UnexpectedArgumentTypeException(String errorIdentifier, String stage, String plainName, String expectedClass, String foundClass) {
super("Error${errorIdentifier ? " $errorIdentifier" : ""}:${stage ? " $stage" : "" } argument '${plainName}' has the wrong type. " +
"Expected type: ${expectedClass}. Found type: ${foundClass}")
this.errorIdentifier = errorIdentifier
this.stage = stage
this.plainName = plainName
this.expectedClass = expectedClass
this.foundClass = foundClass
}
}
/**
* Checks if the given value is of the expected type. If not, an exception is thrown.
*
* @param stage The stage of the argument (input or output)
* @param par The parameter definition
* @param value The value to check
* @param errorIdentifier The identifier to use in the error message
* @return The value, if it is of the expected type
* @throws UnexpectedArgumentTypeException If the value is not of the expected type
*/
def _checkArgumentType(String stage, Map par, Object value, String errorIdentifier) {
// expectedClass will only be != null if value is not of the expected type
def expectedClass = null
def foundClass = null
// todo: split if need be
if (!par.required && value == null) {
expectedClass = null
} else if (par.multiple) {
if (value !instanceof Collection) {
value = [value]
}
// split strings
value = value.collectMany{ val ->
if (val instanceof String) {
// collect() to ensure that the result is a List and not simply an array
val.split(par.multiple_sep).collect()
} else {
[val]
}
}
// process globs
if (par.type == "file" && par.direction == "input") {
value = value.collect{ it instanceof String ? file(it, hidden: true) : it }.flatten()
}
// check types of elements in list
try {
value = value.collect { listVal ->
_checkArgumentType(stage, par + [multiple: false], listVal, errorIdentifier)
}
} catch (UnexpectedArgumentTypeException e) {
expectedClass = "List[${e.expectedClass}]"
foundClass = "List[${e.foundClass}]"
}
} else if (par.type == "string") {
// cast to string if need be. only cast if the value is a GString
if (value instanceof GString) {
value = value as String
}
expectedClass = value instanceof String ? null : "String"
} else if (par.type == "integer") {
// cast to integer if need be
if (value !instanceof Integer) {
try {
value = value as Integer
} catch (NumberFormatException e) {
expectedClass = "Integer"
}
}
} else if (par.type == "long") {
// cast to long if need be
if (value !instanceof Long) {
try {
value = value as Long
} catch (NumberFormatException e) {
expectedClass = "Long"
}
}
} else if (par.type == "double") {
// cast to double if need be
if (value !instanceof Double) {
try {
value = value as Double
} catch (NumberFormatException e) {
expectedClass = "Double"
}
}
} else if (par.type == "float") {
// cast to float if need be
if (value !instanceof Float) {
try {
value = value as Float
} catch (NumberFormatException e) {
expectedClass = "Float"
}
}
} else if (par.type == "boolean" | par.type == "boolean_true" | par.type == "boolean_false") {
// cast to boolean if need be
if (value !instanceof Boolean) {
try {
value = value as Boolean
} catch (Exception e) {
expectedClass = "Boolean"
}
}
} else if (par.type == "file" && (par.direction == "input" || stage == "output")) {
// cast to path if need be
if (value instanceof String) {
value = file(value, hidden: true)
}
if (value instanceof File) {
value = value.toPath()
}
expectedClass = value instanceof Path ? null : "Path"
} else if (par.type == "file" && stage == "input" && par.direction == "output") {
// cast to string if need be
if (value !instanceof String) {
try {
value = value as String
} catch (Exception e) {
expectedClass = "String"
}
}
} else {
// didn't find a match for par.type
expectedClass = par.type
}
if (expectedClass != null) {
if (foundClass == null) {
foundClass = value.getClass().getName()
}
throw new UnexpectedArgumentTypeException(errorIdentifier, stage, par.plainName, expectedClass, foundClass)
}
return value
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processInputValues.nf'
Map _processInputValues(Map inputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.allArguments.each { arg ->
if (arg.required && arg.direction == "input") {
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
}
}
inputs = inputs.collectEntries { name, value ->
def par = config.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid input argument"
value = _checkArgumentType("input", par, value, "in module '$key' id '$id'")
[ name, value ]
}
}
return inputs
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/arguments/_processOutputValues.nf'
Map _checkValidOutputArgument(Map outputs, Map config, String id, String key) {
if (!workflow.stubRun) {
outputs = outputs.collectEntries { name, value ->
def par = config.allArguments.find { it.plainName == name && it.direction == "output" }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"
value = _checkArgumentType("output", par, value, "in module '$key' id '$id'")
[ name, value ]
}
}
return outputs
}
void _checkAllRequiredOuputsPresent(Map outputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.allArguments.each { arg ->
if (arg.direction == "output" && arg.required) {
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
}
}
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/IDChecker.nf'
class IDChecker {
final def items = [] as Set
@groovy.transform.WithWriteLock
boolean observe(String item) {
if (items.contains(item)) {
return false
} else {
items << item
return true
}
}
@groovy.transform.WithReadLock
boolean contains(String item) {
return items.contains(item)
}
@groovy.transform.WithReadLock
Set getItems() {
return items.clone()
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/_checkUniqueIds.nf'
/**
* Check if the ids are unique across parameter sets
*
* @param parameterSets a list of parameter sets.
*/
private void _checkUniqueIds(List<Tuple2<String, Map<String, Object>>> parameterSets) {
def ppIds = parameterSets.collect{it[0]}
assert ppIds.size() == ppIds.unique().size() : "All argument sets should have unique ids. Detected ids: $ppIds"
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/_getChild.nf'
// helper functions for reading params from file //
def _getChild(parent, child) {
if (child.contains("://") || java.nio.file.Paths.get(child).isAbsolute()) {
child
} else {
def parentAbsolute = java.nio.file.Paths.get(parent).toAbsolutePath().toString()
parentAbsolute.replaceAll('/[^/]*$', "/") + child
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/_parseParamList.nf'
/**
* Figure out the param list format based on the file extension
*
* @param param_list A String containing the path to the parameter list file.
*
* @return A String containing the format of the parameter list file.
*/
def _paramListGuessFormat(param_list) {
if (param_list !instanceof String) {
"asis"
} else if (param_list.endsWith(".csv")) {
"csv"
} else if (param_list.endsWith(".json") || param_list.endsWith(".jsn")) {
"json"
} else if (param_list.endsWith(".yaml") || param_list.endsWith(".yml")) {
"yaml"
} else {
"yaml_blob"
}
}
/**
* Read the param list
*
* @param param_list One of the following:
* - A String containing the path to the parameter list file (csv, json or yaml),
* - A yaml blob of a list of maps (yaml_blob),
* - Or a groovy list of maps (asis).
* @param config A Map of the Viash configuration.
*
* @return A List of Maps containing the parameters.
*/
def _parseParamList(param_list, Map config) {
// first determine format by extension
def paramListFormat = _paramListGuessFormat(param_list)
def paramListPath = (paramListFormat != "asis" && paramListFormat != "yaml_blob") ?
file(param_list, hidden: true) :
null
// get the correct parser function for the detected params_list format
def paramSets = []
if (paramListFormat == "asis") {
paramSets = param_list
} else if (paramListFormat == "yaml_blob") {
paramSets = readYamlBlob(param_list)
} else if (paramListFormat == "yaml") {
paramSets = readYaml(paramListPath)
} else if (paramListFormat == "json") {
paramSets = readJson(paramListPath)
} else if (paramListFormat == "csv") {
paramSets = readCsv(paramListPath)
} else {
error "Format of provided --param_list not recognised.\n" +
"Found: '$paramListFormat'.\n" +
"Expected: a csv file, a json file, a yaml file,\n" +
"a yaml blob or a groovy list of maps."
}
// data checks
assert paramSets instanceof List: "--param_list should contain a list of maps"
for (value in paramSets) {
assert value instanceof Map: "--param_list should contain a list of maps"
}
// id is argument
def idIsArgument = config.allArguments.any{it.plainName == "id"}
// Reformat from List<Map> to List<Tuple2<String, Map>> by adding the ID as first element of a Tuple2
paramSets = paramSets.collect({ data ->
def id = data.id
if (!idIsArgument) {
data = data.findAll{k, v -> k != "id"}
}
[id, data]
})
// Split parameters with 'multiple: true'
paramSets = paramSets.collect({ id, data ->
data = _splitParams(data, config)
[id, data]
})
// The paths of input files inside a param_list file may have been specified relatively to the
// location of the param_list file. These paths must be made absolute.
if (paramListPath) {
paramSets = paramSets.collect({ id, data ->
def new_data = data.collectEntries{ parName, parValue ->
def par = config.allArguments.find{it.plainName == parName}
if (par && par.type == "file" && par.direction == "input") {
if (parValue instanceof Collection) {
parValue = parValue.collectMany{path ->
def x = _resolveSiblingIfNotAbsolute(path, paramListPath)
x instanceof Collection ? x : [x]
}
} else {
parValue = _resolveSiblingIfNotAbsolute(parValue, paramListPath)
}
}
[parName, parValue]
}
[id, new_data]
})
}
return paramSets
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/_splitParams.nf'
/**
* Split parameters for arguments that accept multiple values using their separator
*
* @param paramList A Map containing parameters to split.
* @param config A Map of the Viash configuration. This Map can be generated from the config file
* using the readConfig() function.
*
* @return A Map of parameters where the parameter values have been split into a list using
* their seperator.
*/
Map<String, Object> _splitParams(Map<String, Object> parValues, Map config){
def parsedParamValues = parValues.collectEntries { parName, parValue ->
def parameterSettings = config.allArguments.find({it.plainName == parName})
if (!parameterSettings) {
// if argument is not found, do not alter
return [parName, parValue]
}
if (parameterSettings.multiple) { // Check if parameter can accept multiple values
if (parValue instanceof Collection) {
parValue = parValue.collect{it instanceof String ? it.split(parameterSettings.multiple_sep) : it }
} else if (parValue instanceof String) {
parValue = parValue.split(parameterSettings.multiple_sep)
} else if (parValue == null) {
parValue = []
} else {
parValue = [ parValue ]
}
parValue = parValue.flatten()
}
// For all parameters check if multiple values are only passed for
// arguments that allow it. Quietly simplify lists of length 1.
if (!parameterSettings.multiple && parValue instanceof Collection) {
assert parValue.size() == 1 :
"Error: argument ${parName} has too many values.\n" +
" Expected amount: 1. Found: ${parValue.size()}"
parValue = parValue[0]
}
[parName, parValue]
}
return parsedParamValues
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/channelFromParams.nf'
/**
* Parse nextflow parameters based on settings defined in a viash config.
* Return a list of parameter sets, each parameter set corresponding to
* an event in a nextflow channel. The output from this function can be used
* with Channel.fromList to create a nextflow channel with Vdsl3 formatted
* events.
*
* This function performs:
* - A filtering of the params which can be found in the config file.
* - Process the params_list argument which allows a user to to initialise
* a Vsdl3 channel with multiple parameter sets. Possible formats are
* csv, json, yaml, or simply a yaml_blob. A csv should have column names
* which correspond to the different arguments of this pipeline. A json or a yaml
* file should be a list of maps, each of which has keys corresponding to the
* arguments of the pipeline. A yaml blob can also be passed directly as a parameter.
* When passing a csv, json or yaml, relative path names are relativized to the
* location of the parameter file.
* - Combine the parameter sets into a vdsl3 Channel.
*
* @param params Input parameters. Can optionaly contain a 'param_list' key that
* provides a list of arguments that can be split up into multiple events
* in the output channel possible formats of param_lists are: a csv file,
* json file, a yaml file or a yaml blob. Each parameters set (event) must
* have a unique ID.
* @param config A Map of the Viash configuration. This Map can be generated from the config file
* using the readConfig() function.
*
* @return A list of parameters with the first element of the event being
* the event ID and the second element containing a map of the parsed parameters.
*/
private List<Tuple2<String, Map<String, Object>>> _paramsToParamSets(Map params, Map config){
// todo: fetch key from run args
def key_ = config.name
/* parse regular parameters (not in param_list) */
/*************************************************/
def globalParams = config.allArguments
.findAll { params.containsKey(it.plainName) }
.collectEntries { [ it.plainName, params[it.plainName] ] }
def globalID = params.get("id", null)
/* process params_list arguments */
/*********************************/
def paramList = params.containsKey("param_list") && params.param_list != null ?
params.param_list : []
// if (paramList instanceof String) {
// paramList = [paramList]
// }
// def paramSets = paramList.collectMany{ _parseParamList(it, config) }
// TODO: be able to process param_list when it is a list of strings
def paramSets = _parseParamList(paramList, config)
if (paramSets.isEmpty()) {
paramSets = [[null, [:]]]
}
/* combine arguments into channel */
/**********************************/
def processedParams = paramSets.indexed().collect{ index, tup ->
// Process ID
def id = tup[0] ?: globalID
if (workflow.stubRun && !id) {
// if stub run, explicitly add an id if missing
id = "stub${index}"
}
assert id != null: "Each parameter set should have at least an 'id'"
// Process params
def parValues = globalParams + tup[1]
// // Remove parameters which are null, if the default is also null
// parValues = parValues.collectEntries{paramName, paramValue ->
// parameterSettings = config.functionality.allArguments.find({it.plainName == paramName})
// if ( paramValue != null || parameterSettings.get("default", null) != null ) {
// [paramName, paramValue]
// }
// }
parValues = parValues.collectEntries { name, value ->
def par = config.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") }
assert par != null : "Error in module '${key_}' id '${id}': '${name}' is not a valid input argument"
if (par == null) {
return [:]
}
value = _checkArgumentType("input", par, value, "in module '$key_' id '$id'")
[ name, value ]
}
[id, parValues]
}
// Check if ids (first element of each list) is unique
_checkUniqueIds(processedParams)
return processedParams
}
/**
* Parse nextflow parameters based on settings defined in a viash config
* and return a nextflow channel.
*
* @param params Input parameters. Can optionaly contain a 'param_list' key that
* provides a list of arguments that can be split up into multiple events
* in the output channel possible formats of param_lists are: a csv file,
* json file, a yaml file or a yaml blob. Each parameters set (event) must
* have a unique ID.
* @param config A Map of the Viash configuration. This Map can be generated from the config file
* using the readConfig() function.
*
* @return A nextflow Channel with events. Events are formatted as a tuple that contains
* first contains the ID of the event and as second element holds a parameter map.
*
*
*/
def channelFromParams(Map params, Map config) {
def processedParams = _paramsToParamSets(params, config)
return Channel.fromList(processedParams)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/checkUniqueIds.nf'
def checkUniqueIds(Map args) {
def stopOnError = args.stopOnError == null ? args.stopOnError : true
def idChecker = new IDChecker()
return filter { tup ->
if (!idChecker.observe(tup[0])) {
if (stopOnError) {
error "Duplicate id: ${tup[0]}"
} else {
log.warn "Duplicate id: ${tup[0]}, removing duplicate entry"
return false
}
}
return true
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/preprocessInputs.nf'
// This helper file will be deprecated soon
preprocessInputsDeprecationWarningPrinted = false
def preprocessInputsDeprecationWarning() {
if (!preprocessInputsDeprecationWarningPrinted) {
preprocessInputsDeprecationWarningPrinted = true
System.err.println("Warning: preprocessInputs() is deprecated and will be removed in Viash 0.9.0.")
}
}
/**
* Generate a nextflow Workflow that allows processing a channel of
* Vdsl3 formatted events and apply a Viash config to them:
* - Gather default parameters from the Viash config and make
* sure that they are correctly formatted (see applyConfig method).
* - Format the input parameters (also using the applyConfig method).
* - Apply the default parameter to the input parameters.
* - Do some assertions:
* ~ Check if the event IDs in the channel are unique.
*
* The events in the channel are formatted as tuples, with the
* first element of the tuples being a unique id of the parameter set,
* and the second element containg the the parameters themselves.
* Optional extra elements of the tuples will be passed to the output as is.
*
* @param args A map that must contain a 'config' key that points
* to a parsed config (see readConfig()). Optionally, a
* 'key' key can be provided which can be used to create a unique
* name for the workflow process.
*
* @return A workflow that allows processing a channel of Vdsl3 formatted events
* and apply a Viash config to them.
*/
def preprocessInputs(Map args) {
preprocessInputsDeprecationWarning()
def config = args.config
assert config instanceof Map :
"Error in preprocessInputs: config must be a map. " +
"Expected class: Map. Found: config.getClass() is ${config.getClass()}"
def key_ = args.key ?: config.name
// Get different parameter types (used throughout this function)
def defaultArgs = config.allArguments
.findAll { it.containsKey("default") }
.collectEntries { [ it.plainName, it.default ] }
map { tup ->
def id = tup[0]
def data = tup[1]
def passthrough = tup.drop(2)
def new_data = (defaultArgs + data).collectEntries { name, value ->
def par = config.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") }
if (par != null) {
value = _checkArgumentType("input", par, value, "in module '$key_' id '$id'")
}
[ name, value ]
}
[ id, new_data ] + passthrough
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/runComponents.nf'
/**
* Run a list of components on a stream of data.
*
* @param components: list of Viash VDSL3 modules to run
* @param fromState: a closure, a map or a list of keys to extract from the input data.
* If a closure, it will be called with the id, the data and the component config.
* @param toState: a closure, a map or a list of keys to extract from the output data
* If a closure, it will be called with the id, the output data, the old state and the component config.
* @param filter: filter function to apply to the input.
* It will be called with the id, the data and the component config.
* @param id: id to use for the output data
* If a closure, it will be called with the id, the data and the component config.
* @param auto: auto options to pass to the components
*
* @return: a workflow that runs the components
**/
def runComponents(Map args) {
log.warn("runComponents is deprecated, use runEach instead")
assert args.components: "runComponents should be passed a list of components to run"
def components_ = args.components
if (components_ !instanceof List) {
components_ = [ components_ ]
}
assert components_.size() > 0: "pass at least one component to runComponents"
def fromState_ = args.fromState
def toState_ = args.toState
def filter_ = args.filter
def id_ = args.id
workflow runComponentsWf {
take: input_ch
main:
// generate one channel per method
out_chs = components_.collect{ comp_ ->
def comp_config = comp_.config
def filter_ch = filter_
? input_ch | filter{tup ->
filter_(tup[0], tup[1], comp_config)
}
: input_ch
def id_ch = id_
? filter_ch | map{tup ->
// def new_id = id_(tup[0], tup[1], comp_config)
def new_id = tup[0]
if (id_ instanceof String) {
new_id = id_
} else if (id_ instanceof Closure) {
new_id = id_(new_id, tup[1], comp_config)
}
[new_id] + tup.drop(1)
}
: filter_ch
def data_ch = id_ch | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
[key0, new_data[key1]]
}
} else if (fromState_ instanceof List) {
new_data = fromState_.collectEntries{ key ->
[key, new_data[key]]
}
} else if (fromState_ instanceof Closure) {
new_data = fromState_(tup[0], new_data, comp_config)
}
tup.take(1) + [new_data] + tup.drop(1)
}
def out_ch = data_ch
| comp_.run(
auto: (args.auto ?: [:]) + [simplifyInput: false, simplifyOutput: false]
)
def post_ch = toState_
? out_ch | map{tup ->
def output = tup[1]
def old_state = tup[2]
def new_state = null
if (toState_ instanceof Map) {
new_state = old_state + toState_.collectEntries{ key0, key1 ->
[key0, output[key1]]
}
} else if (toState_ instanceof List) {
new_state = old_state + toState_.collectEntries{ key ->
[key, output[key]]
}
} else if (toState_ instanceof Closure) {
new_state = toState_(tup[0], output, old_state, comp_config)
}
[tup[0], new_state] + tup.drop(3)
}
: out_ch
post_ch
}
// mix all results
output_ch =
(out_chs.size == 1)
? out_chs[0]
: out_chs[0].mix(*out_chs.drop(1))
emit: output_ch
}
return runComponentsWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/runEach.nf'
/**
* Run a list of components on a stream of data.
*
* @param components: list of Viash VDSL3 modules to run
* @param fromState: a closure, a map or a list of keys to extract from the input data.
* If a closure, it will be called with the id, the data and the component itself.
* @param toState: a closure, a map or a list of keys to extract from the output data
* If a closure, it will be called with the id, the output data, the old state and the component itself.
* @param filter: filter function to apply to the input.
* It will be called with the id, the data and the component itself.
* @param id: id to use for the output data
* If a closure, it will be called with the id, the data and the component itself.
* @param auto: auto options to pass to the components
*
* @return: a workflow that runs the components
**/
def runEach(Map args) {
assert args.components: "runEach should be passed a list of components to run"
def components_ = args.components
if (components_ !instanceof List) {
components_ = [ components_ ]
}
assert components_.size() > 0: "pass at least one component to runEach"
def fromState_ = args.fromState
def toState_ = args.toState
def filter_ = args.filter
def runIf_ = args.runIf
def id_ = args.id
assert !runIf_ || runIf_ instanceof Closure: "runEach: must pass a Closure to runIf."
workflow runEachWf {
take: input_ch
main:
// generate one channel per method
out_chs = components_.collect{ comp_ ->
def filter_ch = filter_
? input_ch | filter{tup ->
filter_(tup[0], tup[1], comp_)
}
: input_ch
def id_ch = id_
? filter_ch | map{tup ->
def new_id = id_
if (new_id instanceof Closure) {
new_id = new_id(tup[0], tup[1], comp_)
}
assert new_id instanceof String : "Error in runEach: id should be a String or a Closure that returns a String. Expected: id instanceof String. Found: ${new_id.getClass()}"
[new_id] + tup.drop(1)
}
: filter_ch
def chPassthrough = null
def chRun = null
if (runIf_) {
def idRunIfBranch = id_ch.branch{ tup ->
run: runIf_(tup[0], tup[1], comp_)
passthrough: true
}
chPassthrough = idRunIfBranch.passthrough
chRun = idRunIfBranch.run
} else {
chRun = id_ch
chPassthrough = Channel.empty()
}
def data_ch = chRun | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
[key0, new_data[key1]]
}
} else if (fromState_ instanceof List) {
new_data = fromState_.collectEntries{ key ->
[key, new_data[key]]
}
} else if (fromState_ instanceof Closure) {
new_data = fromState_(tup[0], new_data, comp_)
}
tup.take(1) + [new_data] + tup.drop(1)
}
def out_ch = data_ch
| comp_.run(
auto: (args.auto ?: [:]) + [simplifyInput: false, simplifyOutput: false]
)
def post_ch = toState_
? out_ch | map{tup ->
def output = tup[1]
def old_state = tup[2]
def new_state = null
if (toState_ instanceof Map) {
new_state = old_state + toState_.collectEntries{ key0, key1 ->
[key0, output[key1]]
}
} else if (toState_ instanceof List) {
new_state = old_state + toState_.collectEntries{ key ->
[key, output[key]]
}
} else if (toState_ instanceof Closure) {
new_state = toState_(tup[0], output, old_state, comp_)
}
[tup[0], new_state] + tup.drop(3)
}
: out_ch
def return_ch = post_ch
| concat(chPassthrough)
return_ch
}
// mix all results
output_ch =
(out_chs.size == 1)
? out_chs[0]
: out_chs[0].mix(*out_chs.drop(1))
emit: output_ch
}
return runEachWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/channel/safeJoin.nf'
/**
* Join sourceChannel to targetChannel
*
* This function joins the sourceChannel to the targetChannel.
* However, each id in the targetChannel must be present in the
* sourceChannel. If _meta.join_id exists in the targetChannel, that is
* used as an id instead. If the id doesn't match any id in the sourceChannel,
* an error is thrown.
*/
def safeJoin(targetChannel, sourceChannel, key) {
def sourceIDs = new IDChecker()
def sourceCheck = sourceChannel
| map { tup ->
sourceIDs.observe(tup[0])
tup
}
def targetCheck = targetChannel
| map { tup ->
def id = tup[0]
if (!sourceIDs.contains(id)) {
error (
"Error in module '${key}' when merging output with original state.\n" +
" Reason: output with id '${id}' could not be joined with source channel.\n" +
" If the IDs in the output channel differ from the input channel,\n" +
" please set `tup[1]._meta.join_id to the original ID.\n" +
" Original IDs in input channel: ['${sourceIDs.getItems().join("', '")}'].\n" +
" Unexpected ID in the output channel: '${id}'.\n" +
" Example input event: [\"id\", [input: file(...)]],\n" +
" Example output event: [\"newid\", [output: file(...), _meta: [join_id: \"id\"]]]"
)
}
// TODO: add link to our documentation on how to fix this
tup
}
sourceCheck.cross(targetChannel)
| map{ left, right ->
right + left.drop(1)
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/config/_processArgument.nf'
def _processArgument(arg) {
arg.multiple = arg.multiple != null ? arg.multiple : false
arg.required = arg.required != null ? arg.required : false
arg.direction = arg.direction != null ? arg.direction : "input"
arg.multiple_sep = arg.multiple_sep != null ? arg.multiple_sep : ";"
arg.plainName = arg.name.replaceAll("^-*", "")
if (arg.type == "file") {
arg.must_exist = arg.must_exist != null ? arg.must_exist : true
arg.create_parent = arg.create_parent != null ? arg.create_parent : true
}
// add default values to output files which haven't already got a default
if (arg.type == "file" && arg.direction == "output" && arg.default == null) {
def mult = arg.multiple ? "_*" : ""
def extSearch = ""
if (arg.default != null) {
extSearch = arg.default
} else if (arg.example != null) {
extSearch = arg.example
}
if (extSearch instanceof List) {
extSearch = extSearch[0]
}
def extSearchResult = extSearch.find("\\.[^\\.]+\$")
def ext = extSearchResult != null ? extSearchResult : ""
arg.default = "\$id.\$key.${arg.plainName}${mult}${ext}"
if (arg.multiple) {
arg.default = [arg.default]
}
}
if (!arg.multiple) {
if (arg.default != null && arg.default instanceof List) {
arg.default = arg.default[0]
}
if (arg.example != null && arg.example instanceof List) {
arg.example = arg.example[0]
}
}
if (arg.type == "boolean_true") {
arg.default = false
}
if (arg.type == "boolean_false") {
arg.default = true
}
arg
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/config/addGlobalParams.nf'
def addGlobalArguments(config) {
def localConfig = [
"argument_groups": [
[
"name": "Nextflow input-output arguments",
"description": "Input/output parameters for Nextflow itself. Please note that both publishDir and publish_dir are supported but at least one has to be configured.",
"arguments" : [
[
'name': '--publish_dir',
'required': true,
'type': 'string',
'description': 'Path to an output directory.',
'example': 'output/',
'multiple': false
],
[
'name': '--param_list',
'required': false,
'type': 'string',
'description': '''Allows inputting multiple parameter sets to initialise a Nextflow channel. A `param_list` can either be a list of maps, a csv file, a json file, a yaml file, or simply a yaml blob.
|
|* A list of maps (as-is) where the keys of each map corresponds to the arguments of the pipeline. Example: in a `nextflow.config` file: `param_list: [ ['id': 'foo', 'input': 'foo.txt'], ['id': 'bar', 'input': 'bar.txt'] ]`.
|* A csv file should have column names which correspond to the different arguments of this pipeline. Example: `--param_list data.csv` with columns `id,input`.
|* A json or a yaml file should be a list of maps, each of which has keys corresponding to the arguments of the pipeline. Example: `--param_list data.json` with contents `[ {'id': 'foo', 'input': 'foo.txt'}, {'id': 'bar', 'input': 'bar.txt'} ]`.
|* A yaml blob can also be passed directly as a string. Example: `--param_list "[ {'id': 'foo', 'input': 'foo.txt'}, {'id': 'bar', 'input': 'bar.txt'} ]"`.
|
|When passing a csv, json or yaml file, relative path names are relativized to the location of the parameter file. No relativation is performed when `param_list` is a list of maps (as-is) or a yaml blob.'''.stripMargin(),
'example': 'my_params.yaml',
'multiple': false,
'hidden': true
]
// TODO: allow multiple: true in param_list?
// TODO: allow to specify a --param_list_regex to filter the param_list?
// TODO: allow to specify a --param_list_from_state to remap entries in the param_list?
]
]
]
]
return processConfig(_mergeMap(config, localConfig))
}
def _mergeMap(Map lhs, Map rhs) {
return rhs.inject(lhs.clone()) { map, entry ->
if (map[entry.key] instanceof Map && entry.value instanceof Map) {
map[entry.key] = _mergeMap(map[entry.key], entry.value)
} else if (map[entry.key] instanceof Collection && entry.value instanceof Collection) {
map[entry.key] += entry.value
} else {
map[entry.key] = entry.value
}
return map
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/config/generateHelp.nf'
def _generateArgumentHelp(param) {
// alternatives are not supported
// def names = param.alternatives ::: List(param.name)
def unnamedProps = [
["required parameter", param.required],
["multiple values allowed", param.multiple],
["output", param.direction.toLowerCase() == "output"],
["file must exist", param.type == "file" && param.must_exist]
].findAll{it[1]}.collect{it[0]}
def dflt = null
if (param.default != null) {
if (param.default instanceof List) {
dflt = param.default.join(param.multiple_sep != null ? param.multiple_sep : ", ")
} else {
dflt = param.default.toString()
}
}
def example = null
if (param.example != null) {
if (param.example instanceof List) {
example = param.example.join(param.multiple_sep != null ? param.multiple_sep : ", ")
} else {
example = param.example.toString()
}
}
def min = param.min?.toString()
def max = param.max?.toString()
def escapeChoice = { choice ->
def s1 = choice.replaceAll("\\n", "\\\\n")
def s2 = s1.replaceAll("\"", """\\\"""")
s2.contains(",") || s2 != choice ? "\"" + s2 + "\"" : s2
}
def choices = param.choices == null ?
null :
"[ " + param.choices.collect{escapeChoice(it.toString())}.join(", ") + " ]"
def namedPropsStr = [
["type", ([param.type] + unnamedProps).join(", ")],
["default", dflt],
["example", example],
["choices", choices],
["min", min],
["max", max]
]
.findAll{it[1]}
.collect{"\n " + it[0] + ": " + it[1].replaceAll("\n", "\\n")}
.join("")
def descStr = param.description == null ?
"" :
_paragraphWrap("\n" + param.description.trim(), 80 - 8).join("\n ")
"\n --" + param.plainName +
namedPropsStr +
descStr
}
// Based on Helper.generateHelp() in Helper.scala
def _generateHelp(config) {
def fun = config
// PART 1: NAME AND VERSION
def nameStr = fun.name +
(fun.version == null ? "" : " " + fun.version)
// PART 2: DESCRIPTION
def descrStr = fun.description == null ?
"" :
"\n\n" + _paragraphWrap(fun.description.trim(), 80).join("\n")
// PART 3: Usage
def usageStr = fun.usage == null ?
"" :
"\n\nUsage:\n" + fun.usage.trim()
// PART 4: Options
def argGroupStrs = fun.allArgumentGroups.collect{argGroup ->
def name = argGroup.name
def descriptionStr = argGroup.description == null ?
"" :
"\n " + _paragraphWrap(argGroup.description.trim(), 80-4).join("\n ") + "\n"
def arguments = argGroup.arguments.collect{arg ->
arg instanceof String ? fun.allArguments.find{it.plainName == arg} : arg
}.findAll{it != null}
def argumentStrs = arguments.collect{param -> _generateArgumentHelp(param)}
"\n\n$name:" +
descriptionStr +
argumentStrs.join("\n")
}
// FINAL: combine
def out = nameStr +
descrStr +
usageStr +
argGroupStrs.join("")
return out
}
// based on Format._paragraphWrap
def _paragraphWrap(str, maxLength) {
def outLines = []
str.split("\n").each{par ->
def words = par.split("\\s").toList()
def word = null
def line = words.pop()
while(!words.isEmpty()) {
word = words.pop()
if (line.length() + word.length() + 1 <= maxLength) {
line = line + " " + word
} else {
outLines.add(line)
line = word
}
}
if (words.isEmpty()) {
outLines.add(line)
}
}
return outLines
}
def helpMessage(config) {
if (params.containsKey("help") && params.help) {
def mergedConfig = addGlobalArguments(config)
def helpStr = _generateHelp(mergedConfig)
println(helpStr)
exit 0
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/config/processConfig.nf'
def processConfig(config) {
// set defaults for arguments
config.arguments =
(config.arguments ?: []).collect{_processArgument(it)}
// set defaults for argument_group arguments
config.argument_groups =
(config.argument_groups ?: []).collect{grp ->
grp.arguments = (grp.arguments ?: []).collect{_processArgument(it)}
grp
}
// create combined arguments list
config.allArguments =
config.arguments +
config.argument_groups.collectMany{it.arguments}
// add missing argument groups (based on Functionality::allArgumentGroups())
def argGroups = config.argument_groups
if (argGroups.any{it.name.toLowerCase() == "arguments"}) {
argGroups = argGroups.collect{ grp ->
if (grp.name.toLowerCase() == "arguments") {
grp = grp + [
arguments: grp.arguments + config.arguments
]
}
grp
}
} else {
argGroups = argGroups + [
name: "Arguments",
arguments: config.arguments
]
}
config.allArgumentGroups = argGroups
config
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/config/readConfig.nf'
def readConfig(file) {
def config = readYaml(file ?: moduleDir.resolve("config.vsh.yaml"))
processConfig(config)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/_resolveSiblingIfNotAbsolute.nf'
/**
* Resolve a path relative to the current file.
*
* @param str The path to resolve, as a String.
* @param parentPath The path to resolve relative to, as a Path.
*
* @return The path that may have been resovled, as a Path.
*/
def _resolveSiblingIfNotAbsolute(str, parentPath) {
if (str !instanceof String) {
return str
}
if (!_stringIsAbsolutePath(str)) {
return parentPath.resolveSibling(str)
} else {
return file(str, hidden: true)
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/_stringIsAbsolutePath.nf'
/**
* Check whether a path as a string is absolute.
*
* In the past, we tried using `file(., relative: true).isAbsolute()`,
* but the 'relative' option was added in 22.10.0.
*
* @param path The path to check, as a String.
*
* @return Whether the path is absolute, as a boolean.
*/
def _stringIsAbsolutePath(path) {
def _resolve_URL_PROTOCOL = ~/^([a-zA-Z][a-zA-Z0-9]*:)?\\/.+/
assert path instanceof String
return _resolve_URL_PROTOCOL.matcher(path).matches()
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/collectTraces.nf'
class CustomTraceObserver implements nextflow.trace.TraceObserver {
List traces
CustomTraceObserver(List traces) {
this.traces = traces
}
@Override
void onProcessComplete(nextflow.processor.TaskHandler handler, nextflow.trace.TraceRecord trace) {
def trace2 = trace.store.clone()
trace2.script = null
traces.add(trace2)
}
@Override
void onProcessCached(nextflow.processor.TaskHandler handler, nextflow.trace.TraceRecord trace) {
def trace2 = trace.store.clone()
trace2.script = null
traces.add(trace2)
}
}
def collectTraces() {
def traces = Collections.synchronizedList([])
// add custom trace observer which stores traces in the traces object
session.observers.add(new CustomTraceObserver(traces))
traces
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/deepClone.nf'
/**
* Performs a deep clone of the given object.
* @param x an object
*/
def deepClone(x) {
iterateMap(x, {it instanceof Cloneable ? it.clone() : it})
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/getPublishDir.nf'
def getPublishDir() {
return params.containsKey("publish_dir") ? params.publish_dir :
params.containsKey("publishDir") ? params.publishDir :
null
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/getRootDir.nf'
// Recurse upwards until we find a '.build.yaml' file
def _findBuildYamlFile(pathPossiblySymlink) {
def path = pathPossiblySymlink.toRealPath()
def child = path.resolve(".build.yaml")
if (java.nio.file.Files.isDirectory(path) && java.nio.file.Files.exists(child)) {
return child
} else {
def parent = path.getParent()
if (parent == null) {
return null
} else {
return _findBuildYamlFile(parent)
}
}
}
// get the root of the target folder
def getRootDir() {
def dir = _findBuildYamlFile(meta.resources_dir)
assert dir != null: "Could not find .build.yaml in the folder structure"
dir.getParent()
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/iterateMap.nf'
/**
* Recursively apply a function over the leaves of an object.
* @param obj The object to iterate over.
* @param fun The function to apply to each value.
* @return The object with the function applied to each value.
*/
def iterateMap(obj, fun) {
if (obj instanceof List && obj !instanceof String) {
return obj.collect{item ->
iterateMap(item, fun)
}
} else if (obj instanceof Map) {
return obj.collectEntries{key, item ->
[key.toString(), iterateMap(item, fun)]
}
} else {
return fun(obj)
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/functions/niceView.nf'
/**
* A view for printing the event of each channel as a YAML blob.
* This is useful for debugging.
*/
def niceView() {
workflow niceViewWf {
take: input
main:
output = input
| view{toYamlBlob(it)}
emit: output
}
return niceViewWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readCsv.nf'
def readCsv(file_path) {
def output = []
def inputFile = file_path !instanceof Path ? file(file_path, hidden: true) : file_path
// todo: allow escaped quotes in string
// todo: allow single quotes?
def splitRegex = java.util.regex.Pattern.compile(''',(?=(?:[^"]*"[^"]*")*[^"]*$)''')
def removeQuote = java.util.regex.Pattern.compile('''"(.*)"''')
def br = java.nio.file.Files.newBufferedReader(inputFile)
def row = -1
def header = null
while (br.ready() && header == null) {
def line = br.readLine()
row++
if (!line.startsWith("#")) {
header = splitRegex.split(line, -1).collect{field ->
m = removeQuote.matcher(field)
m.find() ? m.replaceFirst('$1') : field
}
}
}
assert header != null: "CSV file should contain a header"
while (br.ready()) {
def line = br.readLine()
row++
if (line == null) {
br.close()
break
}
if (!line.startsWith("#")) {
def predata = splitRegex.split(line, -1)
def data = predata.collect{field ->
if (field == "") {
return null
}
def m = removeQuote.matcher(field)
if (m.find()) {
return m.replaceFirst('$1')
} else {
return field
}
}
assert header.size() == data.size(): "Row $row should contain the same number as fields as the header"
def dataMap = [header, data].transpose().collectEntries().findAll{it.value != null}
output.add(dataMap)
}
}
output
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readJson.nf'
def readJson(file_path) {
def inputFile = file_path !instanceof Path ? file(file_path, hidden: true) : file_path
def jsonSlurper = new groovy.json.JsonSlurper()
jsonSlurper.parse(inputFile)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readJsonBlob.nf'
def readJsonBlob(str) {
def jsonSlurper = new groovy.json.JsonSlurper()
jsonSlurper.parseText(str)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readTaggedYaml.nf'
// Custom constructor to modify how certain objects are parsed from YAML
class CustomConstructor extends org.yaml.snakeyaml.constructor.Constructor {
Path root
class ConstructPath extends org.yaml.snakeyaml.constructor.AbstractConstruct {
public Object construct(org.yaml.snakeyaml.nodes.Node node) {
String filename = (String) constructScalar(node);
if (root != null) {
return root.resolve(filename);
}
return java.nio.file.Paths.get(filename);
}
}
CustomConstructor(org.yaml.snakeyaml.LoaderOptions options, Path root) {
super(options)
this.root = root
// Handling !file tag and parse it back to a File type
this.yamlConstructors.put(new org.yaml.snakeyaml.nodes.Tag("!file"), new ConstructPath())
}
}
def readTaggedYaml(Path path) {
def options = new org.yaml.snakeyaml.LoaderOptions()
def constructor = new CustomConstructor(options, path.getParent())
def yaml = new org.yaml.snakeyaml.Yaml(constructor)
return yaml.load(path.text)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readYaml.nf'
def readYaml(file_path) {
def inputFile = file_path !instanceof Path ? file(file_path, hidden: true) : file_path
def yamlSlurper = new org.yaml.snakeyaml.Yaml()
yamlSlurper.load(inputFile)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/readYamlBlob.nf'
def readYamlBlob(str) {
def yamlSlurper = new org.yaml.snakeyaml.Yaml()
yamlSlurper.load(str)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/toJsonBlob.nf'
String toJsonBlob(data) {
return groovy.json.JsonOutput.toJson(data)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/toTaggedYamlBlob.nf'
// Custom representer to modify how certain objects are represented in YAML
class CustomRepresenter extends org.yaml.snakeyaml.representer.Representer {
Path relativizer
class RepresentPath implements org.yaml.snakeyaml.representer.Represent {
public String getFileName(Object obj) {
if (obj instanceof File) {
obj = ((File) obj).toPath();
}
if (obj !instanceof Path) {
throw new IllegalArgumentException("Object: " + obj + " is not a Path or File");
}
def path = (Path) obj;
if (relativizer != null) {
return relativizer.relativize(path).toString()
} else {
return path.toString()
}
}
public org.yaml.snakeyaml.nodes.Node representData(Object data) {
String filename = getFileName(data);
def tag = new org.yaml.snakeyaml.nodes.Tag("!file");
return representScalar(tag, filename);
}
}
CustomRepresenter(org.yaml.snakeyaml.DumperOptions options, Path relativizer) {
super(options)
this.relativizer = relativizer
this.representers.put(sun.nio.fs.UnixPath, new RepresentPath())
this.representers.put(Path, new RepresentPath())
this.representers.put(File, new RepresentPath())
}
}
String toTaggedYamlBlob(data) {
return toRelativeTaggedYamlBlob(data, null)
}
String toRelativeTaggedYamlBlob(data, Path relativizer) {
def options = new org.yaml.snakeyaml.DumperOptions()
options.setDefaultFlowStyle(org.yaml.snakeyaml.DumperOptions.FlowStyle.BLOCK)
def representer = new CustomRepresenter(options, relativizer)
def yaml = new org.yaml.snakeyaml.Yaml(representer, options)
return yaml.dump(data)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/toYamlBlob.nf'
String toYamlBlob(data) {
def options = new org.yaml.snakeyaml.DumperOptions()
options.setDefaultFlowStyle(org.yaml.snakeyaml.DumperOptions.FlowStyle.BLOCK)
options.setPrettyFlow(true)
def yaml = new org.yaml.snakeyaml.Yaml(options)
def cleanData = iterateMap(data, { it instanceof Path ? it.toString() : it })
return yaml.dump(cleanData)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/writeJson.nf'
void writeJson(data, file) {
assert data: "writeJson: data should not be null"
assert file: "writeJson: file should not be null"
file.write(toJsonBlob(data))
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/readwrite/writeYaml.nf'
void writeYaml(data, file) {
assert data: "writeYaml: data should not be null"
assert file: "writeYaml: file should not be null"
file.write(toYamlBlob(data))
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/findStates.nf'
def findStates(Map params, Map config) {
def auto_config = deepClone(config)
def auto_params = deepClone(params)
auto_config = auto_config.clone()
// override arguments
auto_config.argument_groups = []
auto_config.arguments = [
[
type: "string",
name: "--id",
description: "A dummy identifier",
required: false
],
[
type: "file",
name: "--input_states",
example: "/path/to/input/directory/**/state.yaml",
description: "Path to input directory containing the datasets to be integrated.",
required: true,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--filter",
example: "foo/.*/state.yaml",
description: "Regex to filter state files by path.",
required: false
],
// to do: make this a yaml blob?
[
type: "string",
name: "--rename_keys",
example: ["newKey1:oldKey1", "newKey2:oldKey2"],
description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.",
required: false,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--settings",
example: '{"output_dataset": "dataset.h5ad", "k": 10}',
description: "Global arguments as a JSON glob to be passed to all components.",
required: false
]
]
if (!(auto_params.containsKey("id"))) {
auto_params["id"] = "auto"
}
// run auto config through processConfig once more
auto_config = processConfig(auto_config)
workflow findStatesWf {
helpMessage(auto_config)
output_ch =
channelFromParams(auto_params, auto_config)
| flatMap { autoId, args ->
def globalSettings = args.settings ? readYamlBlob(args.settings) : [:]
// look for state files in input dir
def stateFiles = args.input_states
// filter state files by regex
if (args.filter) {
stateFiles = stateFiles.findAll{ stateFile ->
def stateFileStr = stateFile.toString()
def matcher = stateFileStr =~ args.filter
matcher.matches()}
}
// read in states
def states = stateFiles.collect { stateFile ->
def state_ = readTaggedYaml(stateFile)
[state_.id, state_]
}
// construct renameMap
if (args.rename_keys) {
def renameMap = args.rename_keys.collectEntries{renameString ->
def split = renameString.split(":")
assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey', or 'newKey:oldKey;newKey:oldKey' in case of multiple values"
split
}
// rename keys in state, only let states through which have all keys
// also add global settings
states = states.collectMany{id, state ->
def newState = [:]
for (key in renameMap.keySet()) {
def origKey = renameMap[key]
if (!(state.containsKey(origKey))) {
return []
}
newState[key] = state[origKey]
}
[[id, globalSettings + newState]]
}
}
states
}
emit:
output_ch
}
return findStatesWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/joinStates.nf'
def joinStates(Closure apply_) {
workflow joinStatesWf {
take: input_ch
main:
output_ch = input_ch
| toSortedList
| filter{ it.size() > 0 }
| map{ tups ->
def ids = tups.collect{it[0]}
def states = tups.collect{it[1]}
apply_(ids, states)
}
emit: output_ch
}
return joinStatesWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishFiles.nf'
def publishFiles(Map args) {
def key_ = args.get("key")
assert key_ != null : "publishFiles: key must be specified"
workflow publishFilesWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1]
// the input files and the target output filenames
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
def inputFiles_ = inputoutputFilenames_[0]
def outputFilenames_ = inputoutputFilenames_[1]
[id_, inputFiles_, outputFilenames_]
}
| publishFilesProc
emit: input_ch
}
return publishFilesWf
}
process publishFilesProc {
// todo: check publishpath?
publishDir path: "${getPublishDir()}/", mode: "copy"
tag "$id"
input:
tuple val(id), path(inputFiles, stageAs: "_inputfile?/*"), val(outputFiles)
output:
tuple val(id), path{outputFiles}
script:
def copyCommands = [
inputFiles instanceof List ? inputFiles : [inputFiles],
outputFiles instanceof List ? outputFiles : [outputFiles]
]
.transpose()
.collectMany{infile, outfile ->
if (infile.toString() != outfile.toString()) {
[
"[ -d \"\$(dirname '${outfile.toString()}')\" ] || mkdir -p \"\$(dirname '${outfile.toString()}')\"",
"cp -r '${infile.toString()}' '${outfile.toString()}'"
]
} else {
// no need to copy if infile is the same as outfile
[]
}
}
"""
echo "Copying output files to destination folder"
${copyCommands.join("\n ")}
"""
}
// this assumes that the state contains no other values other than those specified in the config
def publishFilesByConfig(Map args) {
def config = args.get("config")
assert config != null : "publishFilesByConfig: config must be specified"
def key_ = args.get("key", config.name)
assert key_ != null : "publishFilesByConfig: key must be specified"
workflow publishFilesSimpleWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
// - key is a String
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
// - inputPath is a List[Path]
// - outputFilename is a List[String]
// - (inputPath, outputFilename) are the files that will be copied from src to dest (relative to the state.yaml)
def processedState =
config.allArguments
.findAll { it.direction == "output" }
.collectMany { par ->
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output OR multiple channels were emitted
// and the output was just not added to using the channel
// that is now being parsed
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the parameter is not a file, it should be stored
// in the state as-is, but is not something that needs
// to be copied from the source path to the dest path
if (par.type != "file") {
return [[inputPath: [], outputFilename: []]]
}
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def inputPath = val instanceof File ? val.toPath() : val
[inputPath: inputPath, outputFilename: filename_ix]
}
def transposedOutputs = ["inputPath", "outputFilename"].collectEntries{ key ->
[key, outputPerFile.collect{dic -> dic[key]}]
}
return [[key: plainName_] + transposedOutputs]
} else {
def value_ = java.nio.file.Paths.get(filename)
def inputPath = value instanceof File ? value.toPath() : value
return [[inputPath: [inputPath], outputFilename: [filename]]]
}
}
def inputPaths = processedState.collectMany{it.inputPath}
def outputFilenames = processedState.collectMany{it.outputFilename}
[id_, inputPaths, outputFilenames]
}
| publishFilesProc
emit: input_ch
}
return publishFilesSimpleWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/publishStates.nf'
def collectFiles(obj) {
if (obj instanceof java.io.File || obj instanceof Path) {
return [obj]
} else if (obj instanceof List && obj !instanceof String) {
return obj.collectMany{item ->
collectFiles(item)
}
} else if (obj instanceof Map) {
return obj.collectMany{key, item ->
collectFiles(item)
}
} else {
return []
}
}
/**
* Recurse through a state and collect all input files and their target output filenames.
* @param obj The state to recurse through.
* @param prefix The prefix to prepend to the output filenames.
*/
def collectInputOutputPaths(obj, prefix) {
if (obj instanceof File || obj instanceof Path) {
def path = obj instanceof Path ? obj : obj.toPath()
def ext = path.getFileName().toString().find("\\.[^\\.]+\$") ?: ""
def newFilename = prefix + ext
return [[obj, newFilename]]
} else if (obj instanceof List && obj !instanceof String) {
return obj.withIndex().collectMany{item, ix ->
collectInputOutputPaths(item, prefix + "_" + ix)
}
} else if (obj instanceof Map) {
return obj.collectMany{key, item ->
collectInputOutputPaths(item, prefix + "." + key)
}
} else {
return []
}
}
def publishStates(Map args) {
def key_ = args.get("key")
def yamlTemplate_ = args.get("output_state", args.get("outputState", '$id.$key.state.yaml'))
assert key_ != null : "publishStates: key must be specified"
workflow publishStatesWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1]
// the input files and the target output filenames
def inputoutputFilenames_ = collectInputOutputPaths(state_, id_ + "." + key_).transpose()
def yamlFilename = yamlTemplate_
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
// TODO: do the pathnames in state_ match up with the outputFilenames_?
// convert state to yaml blob
def yamlBlob_ = toRelativeTaggedYamlBlob([id: id_] + state_, java.nio.file.Paths.get(yamlFilename))
[id_, yamlBlob_, yamlFilename]
}
| publishStatesProc
emit: input_ch
}
return publishStatesWf
}
process publishStatesProc {
// todo: check publishpath?
publishDir path: "${getPublishDir()}/", mode: "copy"
tag "$id"
input:
tuple val(id), val(yamlBlob), val(yamlFile)
output:
tuple val(id), path{[yamlFile]}
script:
"""
mkdir -p "\$(dirname '${yamlFile}')"
echo "Storing state as yaml"
cat > '${yamlFile}' << HERE
${yamlBlob}
HERE
"""
}
// this assumes that the state contains no other values other than those specified in the config
def publishStatesByConfig(Map args) {
def config = args.get("config")
assert config != null : "publishStatesByConfig: config must be specified"
def key_ = args.get("key", config.name)
assert key_ != null : "publishStatesByConfig: key must be specified"
workflow publishStatesSimpleWf {
take: input_ch
main:
input_ch
| map { tup ->
def id_ = tup[0]
def state_ = tup[1] // e.g. [output: new File("myoutput.h5ad"), k: 10]
def origState_ = tup[2] // e.g. [output: '$id.$key.foo.h5ad']
// TODO: allow overriding the state.yaml template
// TODO TODO: if auto.publish == "state", add output_state as an argument
def yamlTemplate = params.containsKey("output_state") ? params.output_state : '$id.$key.state.yaml'
def yamlFilename = yamlTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
// the processed state is a list of [key, value] tuples, where
// - key is a String
// - value is any object that can be serialized to a Yaml (so a String/Integer/Long/Double/Boolean, a List, a Map, or a Path)
// - (key, value) are the tuples that will be saved to the state.yaml file
def processedState =
config.allArguments
.findAll { it.direction == "output" }
.collectMany { par ->
def plainName_ = par.plainName
// if the state does not contain the key, it's an
// optional argument for which the component did
// not generate any output
if (!state_.containsKey(plainName_)) {
return []
}
def value = state_[plainName_]
// if the parameter is not a file, it should be stored
// in the state as-is, but is not something that needs
// to be copied from the source path to the dest path
if (par.type != "file") {
return [[key: plainName_, value: value]]
}
// if the orig state does not contain this filename,
// it's an optional argument for which the user specified
// that it should not be returned as a state
if (!origState_.containsKey(plainName_)) {
return []
}
def filenameTemplate = origState_[plainName_]
// if the pararameter is multiple: true, fetch the template
if (par.multiple && filenameTemplate instanceof List) {
filenameTemplate = filenameTemplate[0]
}
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
// the index of the file
assert filename.contains("*") : "Module '${key_}' id '${id_}': Multiple output files specified, but no wildcard '*' in the filename: ${filename}"
def outputPerFile = value.withIndex().collect{ val, ix ->
def filename_ix = filename.replace("*", ix.toString())
def value_ = java.nio.file.Paths.get(filename_ix)
// if id contains a slash
if (yamlDir != null) {
value_ = yamlDir.relativize(value_)
}
return value_
}
return [["key": plainName_, "value": outputPerFile]]
} else {
def value_ = java.nio.file.Paths.get(filename)
// if id contains a slash
if (yamlDir != null) {
value_ = yamlDir.relativize(value_)
}
def inputPath = value instanceof File ? value.toPath() : value
return [["key": plainName_, value: value_]]
}
}
def updatedState_ = processedState.collectEntries{[it.key, it.value]}
// convert state to yaml blob
def yamlBlob_ = toTaggedYamlBlob([id: id_] + updatedState_)
[id_, yamlBlob_, yamlFilename]
}
| publishStatesProc
emit: input_ch
}
return publishStatesSimpleWf
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/states/setState.nf'
def setState(fun) {
assert fun instanceof Closure || fun instanceof Map || fun instanceof List :
"Error in setState: Expected process argument to be a Closure, a Map, or a List. Found: class ${fun.getClass()}"
// if fun is a List, convert to map
if (fun instanceof List) {
// check whether fun is a list[string]
assert fun.every{it instanceof CharSequence} : "Error in setState: argument is a List, but not all elements are Strings"
fun = fun.collectEntries{[it, it]}
}
// if fun is a map, convert to closure
if (fun instanceof Map) {
// check whether fun is a map[string, string]
assert fun.values().every{it instanceof CharSequence} : "Error in setState: argument is a Map, but not all values are Strings"
assert fun.keySet().every{it instanceof CharSequence} : "Error in setState: argument is a Map, but not all keys are Strings"
def funMap = fun.clone()
// turn the map into a closure to be used later on
fun = { id_, state_ ->
assert state_ instanceof Map : "Error in setState: the state is not a Map"
funMap.collectMany{newkey, origkey ->
if (state_.containsKey(origkey)) {
[[newkey, state_[origkey]]]
} else {
[]
}
}.collectEntries()
}
}
map { tup ->
def id = tup[0]
def state = tup[1]
def unfilteredState = fun(id, state)
def newState = unfilteredState.findAll{key, val -> val != null}
[id, newState] + tup.drop(2)
}
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/workflowFactory/processAuto.nf'
// TODO: unit test processAuto
def processAuto(Map auto) {
// remove null values
auto = auto.findAll{k, v -> v != null}
// check for unexpected keys
def expectedKeys = ["simplifyInput", "simplifyOutput", "transcript", "publish"]
def unexpectedKeys = auto.keySet() - expectedKeys
assert unexpectedKeys.isEmpty(), "unexpected keys in auto: '${unexpectedKeys.join("', '")}'"
// check auto.simplifyInput
assert auto.simplifyInput instanceof Boolean, "auto.simplifyInput must be a boolean"
// check auto.simplifyOutput
assert auto.simplifyOutput instanceof Boolean, "auto.simplifyOutput must be a boolean"
// check auto.transcript
assert auto.transcript instanceof Boolean, "auto.transcript must be a boolean"
// check auto.publish
assert auto.publish instanceof Boolean || auto.publish == "state", "auto.publish must be a boolean or 'state'"
return auto.subMap(expectedKeys)
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/workflowFactory/processDirectives.nf'
def assertMapKeys(map, expectedKeys, requiredKeys, mapName) {
assert map instanceof Map : "Expected argument '$mapName' to be a Map. Found: class ${map.getClass()}"
map.forEach { key, val ->
assert key in expectedKeys : "Unexpected key '$key' in ${mapName ? mapName + " " : ""}map"
}
requiredKeys.forEach { requiredKey ->
assert map.containsKey(requiredKey) : "Missing required key '$key' in ${mapName ? mapName + " " : ""}map"
}
}
// TODO: unit test processDirectives
def processDirectives(Map drctv) {
// remove null values
drctv = drctv.findAll{k, v -> v != null}
// check for unexpected keys
def expectedKeys = [
"accelerator", "afterScript", "beforeScript", "cache", "conda", "container", "containerOptions", "cpus", "disk", "echo", "errorStrategy", "executor", "machineType", "maxErrors", "maxForks", "maxRetries", "memory", "module", "penv", "pod", "publishDir", "queue", "label", "scratch", "storeDir", "stageInMode", "stageOutMode", "tag", "time"
]
def unexpectedKeys = drctv.keySet() - expectedKeys
assert unexpectedKeys.isEmpty() : "Unexpected keys in process directive: '${unexpectedKeys.join("', '")}'"
/* DIRECTIVE accelerator
accepted examples:
- [ limit: 4, type: "nvidia-tesla-k80" ]
*/
if (drctv.containsKey("accelerator")) {
assertMapKeys(drctv["accelerator"], ["type", "limit", "request", "runtime"], [], "accelerator")
}
/* DIRECTIVE afterScript
accepted examples:
- "source /cluster/bin/cleanup"
*/
if (drctv.containsKey("afterScript")) {
assert drctv["afterScript"] instanceof CharSequence
}
/* DIRECTIVE beforeScript
accepted examples:
- "source /cluster/bin/setup"
*/
if (drctv.containsKey("beforeScript")) {
assert drctv["beforeScript"] instanceof CharSequence
}
/* DIRECTIVE cache
accepted examples:
- true
- false
- "deep"
- "lenient"
*/
if (drctv.containsKey("cache")) {
assert drctv["cache"] instanceof CharSequence || drctv["cache"] instanceof Boolean
if (drctv["cache"] instanceof CharSequence) {
assert drctv["cache"] in ["deep", "lenient"] : "Unexpected value for cache"
}
}
/* DIRECTIVE conda
accepted examples:
- "bwa=0.7.15"
- "bwa=0.7.15 fastqc=0.11.5"
- ["bwa=0.7.15", "fastqc=0.11.5"]
*/
if (drctv.containsKey("conda")) {
if (drctv["conda"] instanceof List) {
drctv["conda"] = drctv["conda"].join(" ")
}
assert drctv["conda"] instanceof CharSequence
}
/* DIRECTIVE container
accepted examples:
- "foo/bar:tag"
- [ registry: "reg", image: "im", tag: "ta" ]
is transformed to "reg/im:ta"
- [ image: "im" ]
is transformed to "im:latest"
*/
if (drctv.containsKey("container")) {
assert drctv["container"] instanceof Map || drctv["container"] instanceof CharSequence
if (drctv["container"] instanceof Map) {
def m = drctv["container"]
assertMapKeys(m, [ "registry", "image", "tag" ], ["image"], "container")
def part1 =
System.getenv('OVERRIDE_CONTAINER_REGISTRY') ? System.getenv('OVERRIDE_CONTAINER_REGISTRY') + "/" :
params.containsKey("override_container_registry") ? params["override_container_registry"] + "/" : // todo: remove?
m.registry ? m.registry + "/" :
""
def part2 = m.image
def part3 = m.tag ? ":" + m.tag : ":latest"
drctv["container"] = part1 + part2 + part3
}
}
/* DIRECTIVE containerOptions
accepted examples:
- "--foo bar"
- ["--foo bar", "-f b"]
*/
if (drctv.containsKey("containerOptions")) {
if (drctv["containerOptions"] instanceof List) {
drctv["containerOptions"] = drctv["containerOptions"].join(" ")
}
assert drctv["containerOptions"] instanceof CharSequence
}
/* DIRECTIVE cpus
accepted examples:
- 1
- 10
*/
if (drctv.containsKey("cpus")) {
assert drctv["cpus"] instanceof Integer
}
/* DIRECTIVE disk
accepted examples:
- "1 GB"
- "2TB"
- "3.2KB"
- "10.B"
*/
if (drctv.containsKey("disk")) {
assert drctv["disk"] instanceof CharSequence
// assert drctv["disk"].matches("[0-9]+(\\.[0-9]*)? *[KMGTPEZY]?B")
// ^ does not allow closures
}
/* DIRECTIVE echo
accepted examples:
- true
- false
*/
if (drctv.containsKey("echo")) {
assert drctv["echo"] instanceof Boolean
}
/* DIRECTIVE errorStrategy
accepted examples:
- "terminate"
- "finish"
*/
if (drctv.containsKey("errorStrategy")) {
assert drctv["errorStrategy"] instanceof CharSequence
assert drctv["errorStrategy"] in ["terminate", "finish", "ignore", "retry"] : "Unexpected value for errorStrategy"
}
/* DIRECTIVE executor
accepted examples:
- "local"
- "sge"
*/
if (drctv.containsKey("executor")) {
assert drctv["executor"] instanceof CharSequence
assert drctv["executor"] in ["local", "sge", "uge", "lsf", "slurm", "pbs", "pbspro", "moab", "condor", "nqsii", "ignite", "k8s", "awsbatch", "google-pipelines"] : "Unexpected value for executor"
}
/* DIRECTIVE machineType
accepted examples:
- "n1-highmem-8"
*/
if (drctv.containsKey("machineType")) {
assert drctv["machineType"] instanceof CharSequence
}
/* DIRECTIVE maxErrors
accepted examples:
- 1
- 3
*/
if (drctv.containsKey("maxErrors")) {
assert drctv["maxErrors"] instanceof Integer
}
/* DIRECTIVE maxForks
accepted examples:
- 1
- 3
*/
if (drctv.containsKey("maxForks")) {
assert drctv["maxForks"] instanceof Integer
}
/* DIRECTIVE maxRetries
accepted examples:
- 1
- 3
*/
if (drctv.containsKey("maxRetries")) {
assert drctv["maxRetries"] instanceof Integer
}
/* DIRECTIVE memory
accepted examples:
- "1 GB"
- "2TB"
- "3.2KB"
- "10.B"
*/
if (drctv.containsKey("memory")) {
assert drctv["memory"] instanceof CharSequence
// assert drctv["memory"].matches("[0-9]+(\\.[0-9]*)? *[KMGTPEZY]?B")
// ^ does not allow closures
}
/* DIRECTIVE module
accepted examples:
- "ncbi-blast/2.2.27"
- "ncbi-blast/2.2.27:t_coffee/10.0"
- ["ncbi-blast/2.2.27", "t_coffee/10.0"]
*/
if (drctv.containsKey("module")) {
if (drctv["module"] instanceof List) {
drctv["module"] = drctv["module"].join(":")
}
assert drctv["module"] instanceof CharSequence
}
/* DIRECTIVE penv
accepted examples:
- "smp"
*/
if (drctv.containsKey("penv")) {
assert drctv["penv"] instanceof CharSequence
}
/* DIRECTIVE pod
accepted examples:
- [ label: "key", value: "val" ]
- [ annotation: "key", value: "val" ]
- [ env: "key", value: "val" ]
- [ [label: "l", value: "v"], [env: "e", value: "v"]]
*/
if (drctv.containsKey("pod")) {
if (drctv["pod"] instanceof Map) {
drctv["pod"] = [ drctv["pod"] ]
}
assert drctv["pod"] instanceof List
drctv["pod"].forEach { pod ->
assert pod instanceof Map
// TODO: should more checks be added?
// See https://www.nextflow.io/docs/latest/process.html?highlight=directives#pod
// e.g. does it contain 'label' and 'value', or 'annotation' and 'value', or ...?
}
}
/* DIRECTIVE publishDir
accepted examples:
- []
- [ [ path: "foo", enabled: true ], [ path: "bar", enabled: false ] ]
- "/path/to/dir"
is transformed to [[ path: "/path/to/dir" ]]
- [ path: "/path/to/dir", mode: "cache" ]
is transformed to [[ path: "/path/to/dir", mode: "cache" ]]
*/
// TODO: should we also look at params["publishDir"]?
if (drctv.containsKey("publishDir")) {
def pblsh = drctv["publishDir"]
// check different options
assert pblsh instanceof List || pblsh instanceof Map || pblsh instanceof CharSequence
// turn into list if not already so
// for some reason, 'if (!pblsh instanceof List) pblsh = [ pblsh ]' doesn't work.
pblsh = pblsh instanceof List ? pblsh : [ pblsh ]
// check elements of publishDir
pblsh = pblsh.collect{ elem ->
// turn into map if not already so
elem = elem instanceof CharSequence ? [ path: elem ] : elem
// check types and keys
assert elem instanceof Map : "Expected publish argument '$elem' to be a String or a Map. Found: class ${elem.getClass()}"
assertMapKeys(elem, [ "path", "mode", "overwrite", "pattern", "saveAs", "enabled" ], ["path"], "publishDir")
// check elements in map
assert elem.containsKey("path")
assert elem["path"] instanceof CharSequence
if (elem.containsKey("mode")) {
assert elem["mode"] instanceof CharSequence
assert elem["mode"] in [ "symlink", "rellink", "link", "copy", "copyNoFollow", "move" ]
}
if (elem.containsKey("overwrite")) {
assert elem["overwrite"] instanceof Boolean
}
if (elem.containsKey("pattern")) {
assert elem["pattern"] instanceof CharSequence
}
if (elem.containsKey("saveAs")) {
assert elem["saveAs"] instanceof CharSequence //: "saveAs as a Closure is currently not supported. Surround your closure with single quotes to get the desired effect. Example: '\{ foo \}'"
}
if (elem.containsKey("enabled")) {
assert elem["enabled"] instanceof Boolean
}
// return final result
elem
}
// store final directive
drctv["publishDir"] = pblsh
}
/* DIRECTIVE queue
accepted examples:
- "long"
- "short,long"
- ["short", "long"]
*/
if (drctv.containsKey("queue")) {
if (drctv["queue"] instanceof List) {
drctv["queue"] = drctv["queue"].join(",")
}
assert drctv["queue"] instanceof CharSequence
}
/* DIRECTIVE label
accepted examples:
- "big_mem"
- "big_cpu"
- ["big_mem", "big_cpu"]
*/
if (drctv.containsKey("label")) {
if (drctv["label"] instanceof CharSequence) {
drctv["label"] = [ drctv["label"] ]
}
assert drctv["label"] instanceof List
drctv["label"].forEach { label ->
assert label instanceof CharSequence
// assert label.matches("[a-zA-Z0-9]([a-zA-Z0-9_]*[a-zA-Z0-9])?")
// ^ does not allow closures
}
}
/* DIRECTIVE scratch
accepted examples:
- true
- "/path/to/scratch"
- '$MY_PATH_TO_SCRATCH'
- "ram-disk"
*/
if (drctv.containsKey("scratch")) {
assert drctv["scratch"] == true || drctv["scratch"] instanceof CharSequence
}
/* DIRECTIVE storeDir
accepted examples:
- "/path/to/storeDir"
*/
if (drctv.containsKey("storeDir")) {
assert drctv["storeDir"] instanceof CharSequence
}
/* DIRECTIVE stageInMode
accepted examples:
- "copy"
- "link"
*/
if (drctv.containsKey("stageInMode")) {
assert drctv["stageInMode"] instanceof CharSequence
assert drctv["stageInMode"] in ["copy", "link", "symlink", "rellink"]
}
/* DIRECTIVE stageOutMode
accepted examples:
- "copy"
- "link"
*/
if (drctv.containsKey("stageOutMode")) {
assert drctv["stageOutMode"] instanceof CharSequence
assert drctv["stageOutMode"] in ["copy", "move", "rsync"]
}
/* DIRECTIVE tag
accepted examples:
- "foo"
- '$id'
*/
if (drctv.containsKey("tag")) {
assert drctv["tag"] instanceof CharSequence
}
/* DIRECTIVE time
accepted examples:
- "1h"
- "2days"
- "1day 6hours 3minutes 30seconds"
*/
if (drctv.containsKey("time")) {
assert drctv["time"] instanceof CharSequence
// todo: validation regex?
}
return drctv
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/workflowFactory/processWorkflowArgs.nf'
def processWorkflowArgs(Map args, Map defaultWfArgs, Map meta) {
// override defaults with args
def workflowArgs = defaultWfArgs + args
// check whether 'key' exists
assert workflowArgs.containsKey("key") : "Error in module '${meta.config.name}': key is a required argument"
// if 'key' is a closure, apply it to the original key
if (workflowArgs["key"] instanceof Closure) {
workflowArgs["key"] = workflowArgs["key"](meta.config.name)
}
def key = workflowArgs["key"]
assert key instanceof CharSequence : "Expected process argument 'key' to be a String. Found: class ${key.getClass()}"
assert key ==~ /^[a-zA-Z_]\w*$/ : "Error in module '$key': Expected process argument 'key' to consist of only letters, digits or underscores. Found: ${key}"
// check for any unexpected keys
def expectedKeys = ["key", "directives", "auto", "map", "mapId", "mapData", "mapPassthrough", "filter", "runIf", "fromState", "toState", "args", "renameKeys", "debug"]
def unexpectedKeys = workflowArgs.keySet() - expectedKeys
assert unexpectedKeys.isEmpty() : "Error in module '$key': unexpected arguments to the '.run()' function: '${unexpectedKeys.join("', '")}'"
// check whether directives exists and apply defaults
assert workflowArgs.containsKey("directives") : "Error in module '$key': directives is a required argument"
assert workflowArgs["directives"] instanceof Map : "Error in module '$key': Expected process argument 'directives' to be a Map. Found: class ${workflowArgs['directives'].getClass()}"
workflowArgs["directives"] = processDirectives(defaultWfArgs.directives + workflowArgs["directives"])
// check whether directives exists and apply defaults
assert workflowArgs.containsKey("auto") : "Error in module '$key': auto is a required argument"
assert workflowArgs["auto"] instanceof Map : "Error in module '$key': Expected process argument 'auto' to be a Map. Found: class ${workflowArgs['auto'].getClass()}"
workflowArgs["auto"] = processAuto(defaultWfArgs.auto + workflowArgs["auto"])
// auto define publish, if so desired
if (workflowArgs.auto.publish == true && (workflowArgs.directives.publishDir != null ? workflowArgs.directives.publishDir : [:]).isEmpty()) {
// can't assert at this level thanks to the no_publish profile
// assert params.containsKey("publishDir") || params.containsKey("publish_dir") :
// "Error in module '${workflowArgs['key']}': if auto.publish is true, params.publish_dir needs to be defined.\n" +
// " Example: params.publish_dir = \"./output/\""
def publishDir = getPublishDir()
if (publishDir != null) {
workflowArgs.directives.publishDir = [[
path: publishDir,
saveAs: "{ it.startsWith('.') ? null : it }", // don't publish hidden files, by default
mode: "copy"
]]
}
}
// auto define transcript, if so desired
if (workflowArgs.auto.transcript == true) {
// can't assert at this level thanks to the no_publish profile
// assert params.containsKey("transcriptsDir") || params.containsKey("transcripts_dir") || params.containsKey("publishDir") || params.containsKey("publish_dir") :
// "Error in module '${workflowArgs['key']}': if auto.transcript is true, either params.transcripts_dir or params.publish_dir needs to be defined.\n" +
// " Example: params.transcripts_dir = \"./transcripts/\""
def transcriptsDir =
params.containsKey("transcripts_dir") ? params.transcripts_dir :
params.containsKey("transcriptsDir") ? params.transcriptsDir :
params.containsKey("publish_dir") ? params.publish_dir + "/_transcripts" :
params.containsKey("publishDir") ? params.publishDir + "/_transcripts" :
null
if (transcriptsDir != null) {
def timestamp = nextflow.Nextflow.getSession().getWorkflowMetadata().start.format('yyyy-MM-dd_HH-mm-ss')
def transcriptsPublishDir = [
path: "$transcriptsDir/$timestamp/\${task.process.replaceAll(':', '-')}/\${id}/",
saveAs: "{ it.startsWith('.') ? it.replaceAll('^.', '') : null }",
mode: "copy"
]
def publishDirs = workflowArgs.directives.publishDir != null ? workflowArgs.directives.publishDir : null ? workflowArgs.directives.publishDir : []
workflowArgs.directives.publishDir = publishDirs + transcriptsPublishDir
}
}
// if this is a stubrun, remove certain directives?
if (workflow.stubRun) {
workflowArgs.directives.keySet().removeAll(["publishDir", "cpus", "memory", "label"])
}
for (nam in ["map", "mapId", "mapData", "mapPassthrough", "filter", "runIf"]) {
if (workflowArgs.containsKey(nam) && workflowArgs[nam]) {
assert workflowArgs[nam] instanceof Closure : "Error in module '$key': Expected process argument '$nam' to be null or a Closure. Found: class ${workflowArgs[nam].getClass()}"
}
}
// TODO: should functions like 'map', 'mapId', 'mapData', 'mapPassthrough' be deprecated as well?
for (nam in ["map", "mapData", "mapPassthrough", "renameKeys"]) {
if (workflowArgs.containsKey(nam) && workflowArgs[nam] != null) {
log.warn "module '$key': workflow argument '$nam' is deprecated and will be removed in Viash 0.9.0. Please use 'fromState' and 'toState' instead."
}
}
// check fromState
workflowArgs["fromState"] = _processFromState(workflowArgs.get("fromState"), key, meta.config)
// check toState
workflowArgs["toState"] = _processToState(workflowArgs.get("toState"), key, meta.config)
// return output
return workflowArgs
}
def _processFromState(fromState, key_, config_) {
assert fromState == null || fromState instanceof Closure || fromState instanceof Map || fromState instanceof List :
"Error in module '$key_': Expected process argument 'fromState' to be null, a Closure, a Map, or a List. Found: class ${fromState.getClass()}"
if (fromState == null) {
return null
}
// if fromState is a List, convert to map
if (fromState instanceof List) {
// check whether fromstate is a list[string]
assert fromState.every{it instanceof CharSequence} : "Error in module '$key_': fromState is a List, but not all elements are Strings"
fromState = fromState.collectEntries{[it, it]}
}
// if fromState is a map, convert to closure
if (fromState instanceof Map) {
// check whether fromstate is a map[string, string]
assert fromState.values().every{it instanceof CharSequence} : "Error in module '$key_': fromState is a Map, but not all values are Strings"
assert fromState.keySet().every{it instanceof CharSequence} : "Error in module '$key_': fromState is a Map, but not all keys are Strings"
def fromStateMap = fromState.clone()
def requiredInputNames = meta.config.allArguments.findAll{it.required && it.direction == "Input"}.collect{it.plainName}
// turn the map into a closure to be used later on
fromState = { it ->
def state = it[1]
assert state instanceof Map : "Error in module '$key_': the state is not a Map"
def data = fromStateMap.collectMany{newkey, origkey ->
// check whether newkey corresponds to a required argument
if (state.containsKey(origkey)) {
[[newkey, state[origkey]]]
} else if (!requiredInputNames.contains(origkey)) {
[]
} else {
throw new Exception("Error in module '$key_': fromState key '$origkey' not found in current state")
}
}.collectEntries()
data
}
}
return fromState
}
def _processToState(toState, key_, config_) {
if (toState == null) {
toState = { tup -> tup[1] }
}
// toState should be a closure, map[string, string], or list[string]
assert toState instanceof Closure || toState instanceof Map || toState instanceof List :
"Error in module '$key_': Expected process argument 'toState' to be a Closure, a Map, or a List. Found: class ${toState.getClass()}"
// if toState is a List, convert to map
if (toState instanceof List) {
// check whether toState is a list[string]
assert toState.every{it instanceof CharSequence} : "Error in module '$key_': toState is a List, but not all elements are Strings"
toState = toState.collectEntries{[it, it]}
}
// if toState is a map, convert to closure
if (toState instanceof Map) {
// check whether toState is a map[string, string]
assert toState.values().every{it instanceof CharSequence} : "Error in module '$key_': toState is a Map, but not all values are Strings"
assert toState.keySet().every{it instanceof CharSequence} : "Error in module '$key_': toState is a Map, but not all keys are Strings"
def toStateMap = toState.clone()
def requiredOutputNames = config_.allArguments.findAll{it.required && it.direction == "Output"}.collect{it.plainName}
// turn the map into a closure to be used later on
toState = { it ->
def output = it[1]
def state = it[2]
assert output instanceof Map : "Error in module '$key_': the output is not a Map"
assert state instanceof Map : "Error in module '$key_': the state is not a Map"
def extraEntries = toStateMap.collectMany{newkey, origkey ->
// check whether newkey corresponds to a required argument
if (output.containsKey(origkey)) {
[[newkey, output[origkey]]]
} else if (!requiredOutputNames.contains(origkey)) {
[]
} else {
throw new Exception("Error in module '$key_': toState key '$origkey' not found in current output")
}
}.collectEntries()
state + extraEntries
}
}
return toState
}
// helper file: 'src/main/resources/io/viash/runners/nextflow/workflowFactory/workflowFactory.nf'
def _debug(workflowArgs, debugKey) {
if (workflowArgs.debug) {
view { "process '${workflowArgs.key}' $debugKey tuple: $it" }
} else {
map { it }
}
}
// depends on: innerWorkflowFactory
def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
def workflowArgs = processWorkflowArgs(args, defaultWfArgs, meta)
def key_ = workflowArgs["key"]
def multipleArgs = meta.config.allArguments.findAll{ it.multiple }.collect{it.plainName}
workflow workflowInstance {
take: input_
main:
def chModified = input_
| checkUniqueIds([:])
| _debug(workflowArgs, "input")
| map { tuple ->
tuple = deepClone(tuple)
if (workflowArgs.map) {
tuple = workflowArgs.map(tuple)
}
if (workflowArgs.mapId) {
tuple[0] = workflowArgs.mapId(tuple[0])
}
if (workflowArgs.mapData) {
tuple[1] = workflowArgs.mapData(tuple[1])
}
if (workflowArgs.mapPassthrough) {
tuple = tuple.take(2) + workflowArgs.mapPassthrough(tuple.drop(2))
}
// check tuple
assert tuple instanceof List :
"Error in module '${key_}': element in channel should be a tuple [id, data, ...otherargs...]\n" +
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
assert tuple.size() >= 2 :
"Error in module '${key_}': expected length of tuple in input channel to be two or greater.\n" +
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
" Found: tuple.size() == ${tuple.size()}"
// check id field
if (tuple[0] instanceof GString) {
tuple[0] = tuple[0].toString()
}
assert tuple[0] instanceof CharSequence :
"Error in module '${key_}': first element of tuple in channel should be a String\n" +
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
" Found: ${tuple[0]}"
// match file to input file
if (workflowArgs.auto.simplifyInput && (tuple[1] instanceof Path || tuple[1] instanceof List)) {
def inputFiles = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "input" }
assert inputFiles.size() == 1 :
"Error in module '${key_}' id '${tuple[0]}'.\n" +
" Anonymous file inputs are only allowed when the process has exactly one file input.\n" +
" Expected: inputFiles.size() == 1. Found: inputFiles.size() is ${inputFiles.size()}"
tuple[1] = [[ inputFiles[0].plainName, tuple[1] ]].collectEntries()
}
// check data field
assert tuple[1] instanceof Map :
"Error in module '${key_}' id '${tuple[0]}': second element of tuple in channel should be a Map\n" +
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
" Expected class: Map. Found: tuple[1].getClass() is ${tuple[1].getClass()}"
// rename keys of data field in tuple
if (workflowArgs.renameKeys) {
assert workflowArgs.renameKeys instanceof Map :
"Error renaming data keys in module '${key_}' id '${tuple[0]}'.\n" +
" Example: renameKeys: ['new_key': 'old_key'].\n" +
" Expected class: Map. Found: renameKeys.getClass() is ${workflowArgs.renameKeys.getClass()}"
assert tuple[1] instanceof Map :
"Error renaming data keys in module '${key_}' id '${tuple[0]}'.\n" +
" Expected class: Map. Found: tuple[1].getClass() is ${tuple[1].getClass()}"
// TODO: allow renameKeys to be a function?
workflowArgs.renameKeys.each { newKey, oldKey ->
assert newKey instanceof CharSequence :
"Error renaming data keys in module '${key_}' id '${tuple[0]}'.\n" +
" Example: renameKeys: ['new_key': 'old_key'].\n" +
" Expected class of newKey: String. Found: newKey.getClass() is ${newKey.getClass()}"
assert oldKey instanceof CharSequence :
"Error renaming data keys in module '${key_}' id '${tuple[0]}'.\n" +
" Example: renameKeys: ['new_key': 'old_key'].\n" +
" Expected class of oldKey: String. Found: oldKey.getClass() is ${oldKey.getClass()}"
assert tuple[1].containsKey(oldKey) :
"Error renaming data keys in module '${key}' id '${tuple[0]}'.\n" +
" Key '$oldKey' is missing in the data map. tuple[1].keySet() is '${tuple[1].keySet()}'"
tuple[1].put(newKey, tuple[1][oldKey])
}
tuple[1].keySet().removeAll(workflowArgs.renameKeys.collect{ newKey, oldKey -> oldKey })
}
tuple
}
def chRun = null
def chPassthrough = null
if (workflowArgs.runIf) {
def runIfBranch = chModified.branch{ tup ->
run: workflowArgs.runIf(tup[0], tup[1])
passthrough: true
}
chRun = runIfBranch.run
chPassthrough = runIfBranch.passthrough
} else {
chRun = chModified
chPassthrough = Channel.empty()
}
def chRunFiltered = workflowArgs.filter ?
chRun | filter{workflowArgs.filter(it)} :
chRun
def chArgs = workflowArgs.fromState ?
chRunFiltered | map{
def new_data = workflowArgs.fromState(it.take(2))
[it[0], new_data]
} :
chRunFiltered | map {tup -> tup.take(2)}
// fill in defaults
def chArgsWithDefaults = chArgs
| map { tuple ->
def id_ = tuple[0]
def data_ = tuple[1]
// TODO: could move fromState to here
// fetch default params from functionality
def defaultArgs = meta.config.allArguments
.findAll { it.containsKey("default") }
.collectEntries { [ it.plainName, it.default ] }
// fetch overrides in params
def paramArgs = meta.config.allArguments
.findAll { par ->
def argKey = key_ + "__" + par.plainName
params.containsKey(argKey)
}
.collectEntries { [ it.plainName, params[key_ + "__" + it.plainName] ] }
// fetch overrides in data
def dataArgs = meta.config.allArguments
.findAll { data_.containsKey(it.plainName) }
.collectEntries { [ it.plainName, data_[it.plainName] ] }
// combine params
def combinedArgs = defaultArgs + paramArgs + workflowArgs.args + dataArgs
// remove arguments with explicit null values
combinedArgs
.removeAll{_, val -> val == null || val == "viash_no_value" || val == "force_null"}
combinedArgs = _processInputValues(combinedArgs, meta.config, id_, key_)
[id_, combinedArgs] + tuple.drop(2)
}
// TODO: move some of the _meta.join_id wrangling to the safeJoin() function.
def chInitialOutputMulti = chArgsWithDefaults
| _debug(workflowArgs, "processed")
// run workflow
| innerWorkflowFactory(workflowArgs)
def chInitialOutputList = chInitialOutputMulti instanceof List ? chInitialOutputMulti : [chInitialOutputMulti]
assert chInitialOutputList.size() > 0: "should have emitted at least one output channel"
// Add a channel ID to the events, which designates the channel the event was emitted from as a running number
// This number is used to sort the events later when the events are gathered from across the channels.
def chInitialOutputListWithIndexedEvents = chInitialOutputList.withIndex().collect{channel, channelIndex ->
def newChannel = channel
| map {tuple ->
assert tuple instanceof List :
"Error in module '${key_}': element in output channel should be a tuple [id, data, ...otherargs...]\n" +
" Example: [\"id\", [input: file('foo.txt'), arg: 10]].\n" +
" Expected class: List. Found: tuple.getClass() is ${tuple.getClass()}"
def newEvent = [channelIndex] + tuple
return newEvent
}
return newChannel
}
// Put the events into 1 channel, cover case where there is only one channel is emitted
def chInitialOutput = chInitialOutputList.size() > 1 ? \
chInitialOutputListWithIndexedEvents[0].mix(*chInitialOutputListWithIndexedEvents.tail()) : \
chInitialOutputListWithIndexedEvents[0]
def chInitialOutputProcessed = chInitialOutput
| map { tuple ->
def channelId = tuple[0]
def id_ = tuple[1]
def output_ = tuple[2]
// see if output map contains metadata
def meta_ =
output_ instanceof Map && output_.containsKey("_meta") ?
output_["_meta"] :
[:]
def join_id = meta_.join_id ?: id_
// remove metadata
output_ = output_.findAll{k, v -> k != "_meta"}
// check value types
output_ = _checkValidOutputArgument(output_, meta.config, id_, key_)
[join_id, channelId, id_, output_]
}
// | view{"chInitialOutput: ${it.take(3)}"}
// join the output [prev_id, channel_id, new_id, output] with the previous state [prev_id, state, ...]
def chPublishWithPreviousState = safeJoin(chInitialOutputProcessed, chRunFiltered, key_)
// input tuple format: [join_id, channel_id, id, output, prev_state, ...]
// output tuple format: [join_id, channel_id, id, new_state, ...]
| map{ tup ->
def new_state = workflowArgs.toState(tup.drop(2).take(3))
tup.take(3) + [new_state] + tup.drop(5)
}
if (workflowArgs.auto.publish == "state") {
def chPublishFiles = chPublishWithPreviousState
// input tuple format: [join_id, channel_id, id, new_state, ...]
// output tuple format: [join_id, channel_id, id, new_state]
| map{ tup ->
tup.take(4)
}
safeJoin(chPublishFiles, chArgsWithDefaults, key_)
// input tuple format: [join_id, channel_id, id, new_state, orig_state, ...]
// output tuple format: [id, new_state, orig_state]
| map { tup ->
tup.drop(2).take(3)
}
| publishFilesByConfig(key: key_, config: meta.config)
}
// Join the state from the events that were emitted from different channels
def chJoined = chInitialOutputProcessed
| map {tuple ->
def join_id = tuple[0]
def channel_id = tuple[1]
def id = tuple[2]
def other = tuple.drop(3)
// Below, groupTuple is used to join the events. To make sure resuming a workflow
// keeps working, the output state must be deterministic. This means the state needs to be
// sorted with groupTuple's has a 'sort' argument. This argument can be set to 'hash',
// but hashing the state when it is large can be problematic in terms of performance.
// Therefore, a custom comparator function is provided. We add the channel ID to the
// states so that we can use the channel ID to sort the items.
def stateWithChannelID = [[channel_id] * other.size(), other].transpose()
// A comparator that is provided to groupTuple's 'sort' argument is applied
// to all elements of the event tuple (that is not the 'id'). The comparator
// closure that is used below expects the input to be List. So the join_id and
// channel_id must also be wrapped in a list.
[[join_id], [channel_id], id] + stateWithChannelID
}
| groupTuple(by: 2, sort: {a, b -> a[0] <=> b[0]}, size: chInitialOutputList.size(), remainder: true)
| map {join_ids, _, id, statesWithChannelID ->
// Remove the channel IDs from the states
def states = statesWithChannelID.collect{it[1]}
def newJoinId = join_ids.flatten().unique{a, b -> a <=> b}
assert newJoinId.size() == 1: "Multiple events were emitted for '$id'."
def newJoinIdUnique = newJoinId[0]
// Merge the states from the different channels
def newState = states.inject([:]){ old_state, state_to_add ->
return old_state + state_to_add.collectEntries{k, v ->
if (!multipleArgs.contains(k)) {
// if the key is not a multiple argument, we expect only one value
if (old_state.containsKey(k)) {
assert old_state[k] == v : "ID $id: multiple entries for argument $k were emitted."
}
[k, v]
} else {
// if the key is a multiple argument, append the different values into one list
def prevValue = old_state.getOrDefault(k, [])
def prevValueAsList = prevValue instanceof List ? prevValue : [prevValue]
[k, prevValueAsList + v]
}
}
}
_checkAllRequiredOuputsPresent(newState, meta.config, id, key_)
// simplify output if need be
if (workflowArgs.auto.simplifyOutput && newState.size() == 1) {
newState = newState.values()[0]
}
return [newJoinIdUnique, id, newState]
}
// join the output [prev_id, new_id, output] with the previous state [prev_id, state, ...]
def chNewState = safeJoin(chJoined, chRunFiltered, key_)
// input tuple format: [join_id, id, output, prev_state, ...]
// output tuple format: [join_id, id, new_state, ...]
| map{ tup ->
def new_state = workflowArgs.toState(tup.drop(1).take(3))
tup.take(2) + [new_state] + tup.drop(4)
}
if (workflowArgs.auto.publish == "state") {
def chPublishStates = chNewState
// input tuple format: [join_id, id, new_state, ...]
// output tuple format: [join_id, id, new_state]
| map{ tup ->
tup.take(3)
}
safeJoin(chPublishStates, chArgsWithDefaults, key_)
// input tuple format: [join_id, id, new_state, orig_state, ...]
// output tuple format: [id, new_state, orig_state]
| map { tup ->
tup.drop(1).take(3)
}
| publishStatesByConfig(key: key_, config: meta.config)
}
chReturn = chNewState
| map { tup ->
// input tuple format: [join_id, id, new_state, ...]
// output tuple format: [id, new_state, ...]
tup.drop(1)
}
| _debug(workflowArgs, "output")
| concat(chPassthrough)
emit: chReturn
}
def wf = workflowInstance.cloneWithName(key_)
// add factory function
wf.metaClass.run = { runArgs ->
workflowFactory(runArgs, workflowArgs, meta)
}
// add config to module for later introspection
wf.metaClass.config = meta.config
return wf
}
nextflow.enable.dsl=2
// START COMPONENT-SPECIFIC CODE
// create meta object
meta = [
"resources_dir": moduleDir.toRealPath().normalize(),
"config": processConfig(readJsonBlob('''{
"name" : "parallel_map",
"version" : "main",
"authors" : [
{
"name" : "Dries Schaumont",
"roles" : [
"maintainer"
],
"info" : {
"links" : {
"email" : "dries@data-intuitive.com",
"github" : "DriesSchaumont",
"orcid" : "0000-0002-4389-0440",
"linkedin" : "dries-schaumont"
},
"organizations" : [
{
"name" : "Data Intuitive",
"href" : "https://www.data-intuitive.com",
"role" : "Data Scientist"
}
]
}
},
{
"name" : "Toni Verbeiren",
"roles" : [
"author",
"maintainer"
],
"info" : {
"role" : "Core Team Member",
"links" : {
"github" : "tverbeiren",
"linkedin" : "verbeiren"
},
"organizations" : [
{
"name" : "Data Intuitive",
"href" : "https://www.data-intuitive.com",
"role" : "Data Scientist and CEO"
}
]
}
}
],
"argument_groups" : [
{
"name" : "Input arguments",
"arguments" : [
{
"type" : "file",
"name" : "--input_r1",
"description" : "Input FASTQ files for the forward reads. All FASTQ file names must start with the prefix '{well_id}_R1', where\n'well_id' can be found as the sequence identifier in the barcodes FASTA file (see 'barcodesFasta' argument).\nFor each FASTQ file, a matching FASTQ file for the reverse reads must be provided to the 'input_r2' argument,\nmeaning that their 'well_id' prefix must match. The number of items provided for 'input_r1' must be equal\nto the number of items for 'input_r2'.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : true,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--input_r2",
"description" : "Input FASTQ files for the reverse reads. All FASTQ file names must start with the prefix '{well_id}_R2', where\n'well_id' can be found as the sequence identifier in the barcodes FASTA file (see 'barcodesFasta' argument).\nFor each FASTQ file, a matching FASTQ file for the reverse reads must be provided to the 'input_r1' argument,\nmeaning that their 'well_id' prefix must match. The number of items provided for 'input_r1' must be equal\nto the number of items for 'input_r2'.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : true,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--genomeDir",
"description" : "Reference genome to match to. Can be generated from genomic FASTA sequences and a genome annotation\nby using STAR with '--runMode genomeGenerate'.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--barcodesFasta",
"description" : "FASTA file where each entry specifies a unique barcode sequence present at the start of the forward input reads\n(input_r1). The IDs of each barcode (the start of the FASTA headers up until the first whitespace character) must\nmatch with the start of one input FASTQ pair.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
}
]
},
{
"name" : "Barcode arguments",
"arguments" : [
{
"type" : "integer",
"name" : "--umiLength",
"description" : "Length of the Unique Molecular Identifiers (UMI). The UMI are expected to be located after the barcodes in the\nforwards reads.\n",
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "string",
"name" : "--limitBAMsortRAM",
"default" : [
"10000000000"
],
"required" : false,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
}
]
},
{
"name" : "Runtime arguments",
"arguments" : [
{
"type" : "integer",
"name" : "--runThreadN",
"description" : "Number of threads to use for a single STAR execution.",
"default" : [
1
],
"required" : false,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
}
]
},
{
"name" : "Output arguments",
"arguments" : [
{
"type" : "file",
"name" : "--output",
"description" : "A list of output folders which are the result of using STAR to map each input FASTQ pair STAR to the reference genome.\nThe order of the items DO NOT match with the order of the entries in the barcodes FASTA file or the input FASTQ pairs. \n",
"default" : [
"./*"
],
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "output",
"multiple" : true,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--joblog",
"description" : "Where to store the log file listing all the jobs.",
"default" : [
"execution_log.txt"
],
"must_exist" : true,
"create_parent" : true,
"required" : false,
"direction" : "output",
"multiple" : false,
"multiple_sep" : ";"
}
]
}
],
"resources" : [
{
"type" : "bash_script",
"path" : "script.sh",
"is_executable" : true
},
{
"type" : "file",
"path" : "STAR"
},
{
"type" : "file",
"path" : "/src/config/labels.config",
"dest" : "nextflow_labels.config"
}
],
"description" : "Map wells in batch, using STAR\nSpliced Transcripts Alignment to a Reference (C) Alexander Dobin\nhttps://github.com/alexdobin/STAR\n",
"test_resources" : [
{
"type" : "bash_script",
"path" : "test.sh",
"is_executable" : true
}
],
"status" : "enabled",
"scope" : {
"image" : "public",
"target" : "public"
},
"requirements" : {
"commands" : [
"ps"
]
},
"license" : "MIT",
"links" : {
"repository" : "https://github.com/viash-hub/htrnaseq"
},
"runners" : [
{
"type" : "executable",
"id" : "executable",
"docker_setup_strategy" : "ifneedbepullelsecachedbuild"
},
{
"type" : "nextflow",
"id" : "nextflow",
"directives" : {
"tag" : "$id"
},
"auto" : {
"simplifyInput" : true,
"simplifyOutput" : false,
"transcript" : false,
"publish" : false
},
"config" : {
"labels" : {
"mem1gb" : "memory = 1000000000.B",
"mem2gb" : "memory = 2000000000.B",
"mem5gb" : "memory = 5000000000.B",
"mem10gb" : "memory = 10000000000.B",
"mem20gb" : "memory = 20000000000.B",
"mem50gb" : "memory = 50000000000.B",
"mem100gb" : "memory = 100000000000.B",
"mem200gb" : "memory = 200000000000.B",
"mem500gb" : "memory = 500000000000.B",
"mem1tb" : "memory = 1000000000000.B",
"mem2tb" : "memory = 2000000000000.B",
"mem5tb" : "memory = 5000000000000.B",
"mem10tb" : "memory = 10000000000000.B",
"mem20tb" : "memory = 20000000000000.B",
"mem50tb" : "memory = 50000000000000.B",
"mem100tb" : "memory = 100000000000000.B",
"mem200tb" : "memory = 200000000000000.B",
"mem500tb" : "memory = 500000000000000.B",
"mem1gib" : "memory = 1073741824.B",
"mem2gib" : "memory = 2147483648.B",
"mem4gib" : "memory = 4294967296.B",
"mem8gib" : "memory = 8589934592.B",
"mem16gib" : "memory = 17179869184.B",
"mem32gib" : "memory = 34359738368.B",
"mem64gib" : "memory = 68719476736.B",
"mem128gib" : "memory = 137438953472.B",
"mem256gib" : "memory = 274877906944.B",
"mem512gib" : "memory = 549755813888.B",
"mem1tib" : "memory = 1099511627776.B",
"mem2tib" : "memory = 2199023255552.B",
"mem4tib" : "memory = 4398046511104.B",
"mem8tib" : "memory = 8796093022208.B",
"mem16tib" : "memory = 17592186044416.B",
"mem32tib" : "memory = 35184372088832.B",
"mem64tib" : "memory = 70368744177664.B",
"mem128tib" : "memory = 140737488355328.B",
"mem256tib" : "memory = 281474976710656.B",
"mem512tib" : "memory = 562949953421312.B",
"cpu1" : "cpus = 1",
"cpu2" : "cpus = 2",
"cpu5" : "cpus = 5",
"cpu10" : "cpus = 10",
"cpu20" : "cpus = 20",
"cpu50" : "cpus = 50",
"cpu100" : "cpus = 100",
"cpu200" : "cpus = 200",
"cpu500" : "cpus = 500",
"cpu1000" : "cpus = 1000"
},
"script" : [
"includeConfig(\\"nextflow_labels.config\\")"
]
},
"debug" : false,
"container" : "docker"
}
],
"engines" : [
{
"type" : "docker",
"id" : "docker",
"image" : "debian:stable-slim",
"target_registry" : "images.viash-hub.com",
"target_tag" : "main",
"namespace_separator" : "/",
"setup" : [
{
"type" : "apt",
"packages" : [
"procps",
"wget",
"automake",
"make",
"gcc",
"g++",
"zlib1g-dev",
"parallel",
"file",
"seqkit"
],
"interactive" : false
},
{
"type" : "docker",
"copy" : [
"STAR /usr/local/bin/$STAR_BINARY"
],
"build_args" : [
"STAR_V=2.7.6a"
],
"env" : [
"STAR_SOURCE=\\"https://github.com/alexdobin/STAR/archive/refs/tags/$STAR_V.tar.gz\\"",
"STAR_TARGET=\\"/app/star-$STAR_V.tar.gz\\"",
"STAR_INSTALL_DIR=\\"/app/STAR-$STAR_V\\"",
"STAR_BINARY=STAR"
]
}
]
},
{
"type" : "native",
"id" : "native"
}
],
"build_info" : {
"config" : "/workdir/root/repo/src/parallel_map/config.vsh.yaml",
"runner" : "nextflow",
"engine" : "docker|native",
"output" : "target/nextflow/parallel_map",
"viash_version" : "0.9.4",
"git_commit" : "c7c84719b518068592943d365eddf2d15a6658be",
"git_remote" : "https://github.com/viash-hub/htrnaseq",
"git_tag" : "v0.7.2-3-gc7c8471"
},
"package_config" : {
"name" : "htrnaseq",
"version" : "main",
"summary" : "A workflow for high-throughput RNA-seq data analyses.\n",
"description" : "This workflow is designed to process high-throughput RNA-seq data, where every\nwell of a microarray plate is a sample. A fasta file provided as input\ndefines the mapping between sample barcodes and wells.\n\nThe workflow is built in a modular fashion, where most of the base functionality\nis provided by components from [`biobox`](https://www.viash-hub.com/packages/biobox/latest)\nsupplemented by custom base components and workflow components in this package.\n\nThe full workflow is split in two major subworkflows that can be run independently:\n\n* **Well-demultiplexing:** Split the input (plate/pool level) fastq files per well.\n* **Mapping, counting and QC:** Run per-well mapping, counting and generate QC reports.\n\nEach of those can be started individually, or the full workflow can be run in two ways:\n\n1. Run the [main workflow](https://www.viash-hub.com/packages/htrnaseq/v0.3.0/components/workflows/htrnaseq) \ncontaining the main functionality.\n2. Run the [(opinionated) `runner`](https://www.viash-hub.com/packages/htrnaseq/v0.3.0/components/workflows/runner) where a\nnumber of choices (input/output structure and location) have been made.\n\nInput for the workflow has to be `fastq` files (zipped or not). For bcl or other formats, please consider running\n[demultiplex](https://www.viash-hub.com/packages/demultiplex) first.\n",
"info" : {
"test_resources" : [
{
"path" : "gs://viash-hub-resources/htrnaseq/v1",
"dest" : "resources_test"
}
]
},
"viash_version" : "0.9.4",
"source" : "src",
"target" : "target",
"config_mods" : [
".requirements.commands := ['ps']\n.runners[.type == 'nextflow'].config.script := 'includeConfig(\\"nextflow_labels.config\\")'\n.resources += {path: '/src/config/labels.config', dest: 'nextflow_labels.config'}\n",
".engines += { type: \\"native\\" }",
".engines[.type == 'docker'].target_registry := 'images.viash-hub.com'",
".engines[.type == 'docker'].target_tag := 'main'"
],
"keywords" : [
"bioinformatics",
"sequencing",
"high-throughput",
"RNAseq",
"mapping",
"counting",
"pipeline",
"workflow"
],
"license" : "MIT",
"organization" : "vsh",
"links" : {
"repository" : "https://github.com/viash-hub/htrnaseq",
"issue_tracker" : "https://github.com/viash-hub/htrnaseq/issues"
}
}
}'''))
]
// resolve dependencies dependencies (if any)
// inner workflow
// inner workflow hook
def innerWorkflowFactory(args) {
def rawScript = '''set -e
tempscript=".viash_script.sh"
cat > "$tempscript" << VIASHMAIN
#!/bin/bash
## VIASH START
# The following code has been auto-generated by Viash.
$( if [ ! -z ${VIASH_PAR_INPUT_R1+x} ]; then echo "${VIASH_PAR_INPUT_R1}" | sed "s#'#'\\"'\\"'#g;s#.*#par_input_r1='&'#" ; else echo "# par_input_r1="; fi )
$( if [ ! -z ${VIASH_PAR_INPUT_R2+x} ]; then echo "${VIASH_PAR_INPUT_R2}" | sed "s#'#'\\"'\\"'#g;s#.*#par_input_r2='&'#" ; else echo "# par_input_r2="; fi )
$( if [ ! -z ${VIASH_PAR_GENOMEDIR+x} ]; then echo "${VIASH_PAR_GENOMEDIR}" | sed "s#'#'\\"'\\"'#g;s#.*#par_genomeDir='&'#" ; else echo "# par_genomeDir="; fi )
$( if [ ! -z ${VIASH_PAR_BARCODESFASTA+x} ]; then echo "${VIASH_PAR_BARCODESFASTA}" | sed "s#'#'\\"'\\"'#g;s#.*#par_barcodesFasta='&'#" ; else echo "# par_barcodesFasta="; fi )
$( if [ ! -z ${VIASH_PAR_UMILENGTH+x} ]; then echo "${VIASH_PAR_UMILENGTH}" | sed "s#'#'\\"'\\"'#g;s#.*#par_umiLength='&'#" ; else echo "# par_umiLength="; fi )
$( if [ ! -z ${VIASH_PAR_LIMITBAMSORTRAM+x} ]; then echo "${VIASH_PAR_LIMITBAMSORTRAM}" | sed "s#'#'\\"'\\"'#g;s#.*#par_limitBAMsortRAM='&'#" ; else echo "# par_limitBAMsortRAM="; fi )
$( if [ ! -z ${VIASH_PAR_RUNTHREADN+x} ]; then echo "${VIASH_PAR_RUNTHREADN}" | sed "s#'#'\\"'\\"'#g;s#.*#par_runThreadN='&'#" ; else echo "# par_runThreadN="; fi )
$( if [ ! -z ${VIASH_PAR_OUTPUT+x} ]; then echo "${VIASH_PAR_OUTPUT}" | sed "s#'#'\\"'\\"'#g;s#.*#par_output='&'#" ; else echo "# par_output="; fi )
$( if [ ! -z ${VIASH_PAR_JOBLOG+x} ]; then echo "${VIASH_PAR_JOBLOG}" | sed "s#'#'\\"'\\"'#g;s#.*#par_joblog='&'#" ; else echo "# par_joblog="; fi )
$( if [ ! -z ${VIASH_META_NAME+x} ]; then echo "${VIASH_META_NAME}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_name='&'#" ; else echo "# meta_name="; fi )
$( if [ ! -z ${VIASH_META_FUNCTIONALITY_NAME+x} ]; then echo "${VIASH_META_FUNCTIONALITY_NAME}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_functionality_name='&'#" ; else echo "# meta_functionality_name="; fi )
$( if [ ! -z ${VIASH_META_RESOURCES_DIR+x} ]; then echo "${VIASH_META_RESOURCES_DIR}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_resources_dir='&'#" ; else echo "# meta_resources_dir="; fi )
$( if [ ! -z ${VIASH_META_EXECUTABLE+x} ]; then echo "${VIASH_META_EXECUTABLE}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_executable='&'#" ; else echo "# meta_executable="; fi )
$( if [ ! -z ${VIASH_META_CONFIG+x} ]; then echo "${VIASH_META_CONFIG}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_config='&'#" ; else echo "# meta_config="; fi )
$( if [ ! -z ${VIASH_META_TEMP_DIR+x} ]; then echo "${VIASH_META_TEMP_DIR}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_temp_dir='&'#" ; else echo "# meta_temp_dir="; fi )
$( if [ ! -z ${VIASH_META_CPUS+x} ]; then echo "${VIASH_META_CPUS}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_cpus='&'#" ; else echo "# meta_cpus="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_B+x} ]; then echo "${VIASH_META_MEMORY_B}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_b='&'#" ; else echo "# meta_memory_b="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_KB+x} ]; then echo "${VIASH_META_MEMORY_KB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_kb='&'#" ; else echo "# meta_memory_kb="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_MB+x} ]; then echo "${VIASH_META_MEMORY_MB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_mb='&'#" ; else echo "# meta_memory_mb="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_GB+x} ]; then echo "${VIASH_META_MEMORY_GB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_gb='&'#" ; else echo "# meta_memory_gb="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_TB+x} ]; then echo "${VIASH_META_MEMORY_TB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_tb='&'#" ; else echo "# meta_memory_tb="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_PB+x} ]; then echo "${VIASH_META_MEMORY_PB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_pb='&'#" ; else echo "# meta_memory_pb="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_KIB+x} ]; then echo "${VIASH_META_MEMORY_KIB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_kib='&'#" ; else echo "# meta_memory_kib="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_MIB+x} ]; then echo "${VIASH_META_MEMORY_MIB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_mib='&'#" ; else echo "# meta_memory_mib="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_GIB+x} ]; then echo "${VIASH_META_MEMORY_GIB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_gib='&'#" ; else echo "# meta_memory_gib="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_TIB+x} ]; then echo "${VIASH_META_MEMORY_TIB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_tib='&'#" ; else echo "# meta_memory_tib="; fi )
$( if [ ! -z ${VIASH_META_MEMORY_PIB+x} ]; then echo "${VIASH_META_MEMORY_PIB}" | sed "s#'#'\\"'\\"'#g;s#.*#meta_memory_pib='&'#" ; else echo "# meta_memory_pib="; fi )
## VIASH END
set -eo pipefail
# Check if wildcard character is present in output folder template
printf "Checking if output folder template (\\$par_output) contains a single wildcard character '*'. "
output_glob_character="\\${par_output//[^\\\\*]}"
if [[ "\\${#output_glob_character}" -ne "1" ]]; then
echo "The value for --output must contain exactly one '*' character. Exiting..."
exit 1
else
echo "Done, wildcard character found!"
fi
# Split the delimited strings into arrays
IFS=';' read -r -a input_r1 <<< "\\$par_input_r1"
IFS=';' read -r -a input_r2 <<< "\\$par_input_r2"
# Read barcodes FASTQ
# seqkit will make sure to take the leading non-whitespace as sequence identifier (ID)
# Luckily, this is the same as how cutadapt determines an adapter name from the FASTA header.
readarray -t well_ids < <(seqkit seq --name "\\$par_barcodesFasta" )
readarray -t barcodes < <(seqkit seq --seq --upper-case --remove-gaps --gap-letters '^' --validate-seq "\\$par_barcodesFasta")
# Function to test for unique values in array
function arrayContainsUniqueValues {
# Pass the argument by reference
local -n arr=\\$1
# Create a temporary associative array
# in order to use its uniqueness of keys
# 'declare' in a function is automatically local
declare -A uniq_tmp
for item in "\\${arr[@]}"; do
uniq_tmp[\\$item]=0 # assigning a placeholder
done
local unique_array_values=(\\${!uniq_tmp[@]})
if [ "\\${#unique_array_values[@]}" -eq "\\${#arr[@]}" ]; then
return
fi
false
}
arrayContainsUniqueValues barcodes
is_array_unique_exit_code=\\$?
if ! (exit \\$is_array_unique_exit_code); then
echo "The provided barcodes should be unique!"
echo "Values: \\$par_barcodes"
exit 1
fi
# Check that the number of values provided for the fastq files are the same.
num_r1_inputs="\\${#input_r1[@]}"
num_r2_inputs="\\${#input_r2[@]}"
if [ ! "\\$num_r1_inputs" -eq "\\$num_r2_inputs" ]; then
echo "The number of values for arguments "\\\\
"'input_r1' (\\$num_r1_inputs) and 'input_r2' (\\$num_r2_inputs) "\\\\
"should be the same."
exit 1
else
echo "Checked if the same as the number of R1 FASTQ (\\$num_r1_inputs) and R2 FASTQ files "\\\\
"(\\$num_r2_inputs) were provided. Seems OK!"
fi
# Loop over the well IDs and match them to the input FASTQ files
# The FASTQ file names should have the format {well_id}_R(1|2).fastq,
# which is the output format that the cutadapt component uses for demultiplexing.
# sorted_input_r1 and sorted_input_r2 are the input FASTQ files sorted by the order
# of the barcodes in the barcodes array (i.e. the order in the barcodes FASTA file).
declare -a sorted_input_r1=()
declare -a sorted_input_r2=()
for barcode_index in "\\${!barcodes[@]}"; do
barcode="\\${barcodes[\\$barcode_index]}"
well_id="\\${well_ids[\\$barcode_index]}"
echo "Finding FASTQ files for barcode \\${barcode}, well ID '\\${well_id}'."
# The FASTQ files for a particular barcode must match the following regex:
input_file_regex="^\\${well_id}_R[1-2]"
for r1_index in "\\${!input_r1[@]}"; do
r1_file_path=\\${input_r1[\\$r1_index]}
r2_file_path=\\${input_r2[\\$r1_index]}
# Get the file names from the full path
r1_file_name=\\$(basename -- "\\$r1_file_path")
r2_file_name=\\$(basename -- "\\$r2_file_path")
# Check if the file names match the regex
if [[ \\$r1_file_name =~ \\$input_file_regex ]]; then
echo "Matched with \\$r1_file_name and \\$r2_file_name."
# If the R1 FASTQ file matched the regex,
# the R2 file must have also been matched
if ! [[ \\$r2_file_name =~ \\$input_file_regex ]]; then
echo "File \\${r1_file_name} matched with regex \\${input_file_regex} "\\\\
"but \\${r2_file_name} did not! Make sure that the order of "\\\\
"the R1 and R2 input files match."
exit 1
fi
# Add the
sorted_input_r1+=("\\$r1_file_path")
sorted_input_r2+=("\\$r2_file_path")
# Do not continue looking for more files for this barcode
# '2' to affect the *outer* loop (which indeed loops barcodes)!
continue 2
fi
done
echo "Did not find FASTQ files files for well \\${well_id}! "\\\\
"Make sure that the input files have the correct file name format."\\\\
"Input files: \\${input_r1[@]}"
exit 1
done
# Define the function that will be used to run a single job
function _run() {
local par_UMIlength="\\$1"
local par_output="\\$2"
local par_genomeDir="\\$3"
local par_limitBAMsortRAM="\\$4"
local par_runThreadN="\\$5"
local barcode="\\$6"
local input_R1="\\$7"
local input_R2="\\$8"
local barcode_length="\\${#barcode}"
local umi_start="\\$((\\$barcode_length + 1))"
set -eo pipefail
echo <<-EOF
Processing \\$barcode
For the following inputs (lanes):
"\\$star_readFilesIn
EOF
echo "Writing barcode '\\$barcode' to \\$barcode.txt and using it as input".
# Note that there is no possible conflict between jobs here
# because the barcodes are unique (and the barcode is part of the name
# of the file).
echo "\\$barcode" > "\\$barcode.txt"
local dir="\\${par_output//\\\\*/\\$barcode}/"
echo "Setting output for barcode '\\$barcode' to '\\$dir'."
mkdir -p "\\$dir"
# check if files are compressed
local TMPDIR=\\$(mktemp -d "\\$meta_temp_dir/parallel_map-\\$barcode-XXXXXX")
function clean_up {
[[ -d "\\$TMPDIR" ]] && rm -r "\\$TMPDIR"
}
trap clean_up RETURN
# Decompress the input files when needed
# NOTE: for some reason, using STAR's --readFilesCommand does not always work
# This might be because STAR creates fifo files (see https://man7.org/linux/man-pages/man7/fifo.7.html)
# and this requires a filesystem that supports this. Another cause might be that the input files
# are symlinks. When testing this, using '--readFilesCommand "zcat"'
# always produced empty BAM files, but also a succesfull exit code (0) so the problem is not reported.
# However, the logs showed the following error: "gzip -: unexpected end of file".
function is_gzipped {
printf "Checking if input '\\$1' (barcode '\\$barcode') is gzipped... "
if file "\\$1" | grep -q 'gzip'; then
echo "Done, detected compressed file."
return
fi
echo "Done, file does not need decompression."
false
}
# Resolve symbolic links to actual file paths
input_R1=\\$(realpath \\$input_R1)
input_R2=\\$(realpath \\$input_R2)
if is_gzipped \\$input_R1; then
local compressed_file_name_r1="\\$(basename -- \\$input_R1)"
local uncompressed_file_r1="\\$TMPDIR/\\${compressed_file_name_r1%.gz}"
printf "Unpacking input to \\$uncompressed_file_r1... "
zcat "\\$input_R1" > "\\$uncompressed_file_r1"
echo "Decompression done."
else
local uncompressed_file_r1="\\$input_R1"
fi
if is_gzipped \\$input_R2; then
local compressed_file_name_r2="\\$(basename -- \\$input_R2)"
local uncompressed_file_r2="\\$TMPDIR/\\${compressed_file_name_r2%.gz}"
printf "Unpacking input to \\$uncompressed_file_r2... "
zcat "\\$input_R2" > "\\$uncompressed_file_r2"
echo "Decompression done."
else
local uncompressed_file_r2="\\$input_R2"
fi
local n_input_lines_r1=\\$(wc -l < "\\$uncompressed_file_r1")
local n_input_lines_r2=\\$(wc -l < "\\$uncompressed_file_r2")
printf "Checking if length of input file mates match. "
if (( \\$n_input_lines_r1 != n_input_lines_r2 )); then
echo "The length of file \\$input_R1 (\\$n_input_lines_r1) does not match with \\$input_R2 (\\$n_input_lines_r2)"
return 1
else
echo "Seems OK, \\$n_input_lines_r1 input lines."
fi
echo "Starting STAR for barcode '\\$barcode'"
# soloType 'Droplet' is the same as 'CB_UMI_Simple': one UMI and one cell barcode of fixed length.
# By default in this mode, STAR will look for the cell barcode and the UMI int the last files specified with --readFilesIn
# So we need to specify R2 first and R1 second, because R1 contains the barcode and UMI.
# Also, you might be tempted to use '--soloBarcodeMate 1' to alter this behavior, but this requires the clipping
# the barcode from this mate by specifying --clip5pNbases and/or --clip3pNbases, which we do not want to do.
STAR \\\\
--readFilesIn "\\$uncompressed_file_r2" "\\$uncompressed_file_r1" \\\\
--soloType Droplet \\\\
--quantMode GeneCounts \\\\
--genomeLoad LoadAndKeep \\\\
--limitBAMsortRAM "\\$par_limitBAMsortRAM" \\\\
--runThreadN "\\$par_runThreadN" \\\\
--outFilterMultimapNmax 1 \\\\
--outSAMtype BAM SortedByCoordinate \\\\
--soloCBstart 1 \\\\
--readFilesType "Fastx" \\\\
--soloCBlen "\\$barcode_length" \\\\
--soloUMIstart "\\$umi_start" \\\\
--soloUMIlen "\\$par_UMIlength" \\\\
--soloBarcodeReadLength 0 \\\\
--soloStrand Unstranded \\\\
--soloFeatures Gene \\\\
--genomeDir "\\$par_genomeDir" \\\\
--outReadsUnmapped Fastx \\\\
--outSAMunmapped Within \\\\
--outSAMattributes NH HI nM AS CR UR CB UB GX GN \\\\
--soloCBwhitelist "\\$barcode.txt" \\\\
--outFileNamePrefix "\\$dir" \\\\
--outTmpDir "\\$TMPDIR/STARtemp/"
printf "Done running STAR. "
# Check if the number of processed reads is equal to the number of input reads
local n_input_reads=\\$((\\$n_input_lines_r1 / 4))
local nr_output_reads=\\$(grep -Po "Number\\\\ of\\\\ input\\\\ reads \\\\\\\\|\\\\W*\\\\K\\\\d+" "\\$dir/Log.final.out")
if (( \\$nr_output_reads != \\$n_input_reads )); then
echo "Not all input reads were processed for barcode \\$barcode."
return 1
else
echo "Processed \\$nr_output_reads reads for barcode \\$barcode".
fi
printf "Making sure that the output has the proper permissions."
find "\\$dir" -type d -exec chmod o+x {} \\\\;
chmod -R o+r "\\$dir"
echo "Done"
}
# Export the function - requires bash
export -f _run
# Load reference genome
echo "Loading reference genome"
STAR --genomeLoad LoadAndExit --genomeDir "\\$par_genomeDir"
# Run the concurrent jobs using GNU parallel
# Make sure that parallel uses the correct shell
export PARALLEL_SHELL="/bin/bash"
# Some notes:
# --halt now,fail=1: instruct parallel to exit when a job has failed and kill remaining running jobs.
#
# ::: is a special syntax for GNU parallel to delineate inputs
# If multiple ::: are given, each group will be treated as an input source, and all combinations of input
# sources will be generated. E.g. ::: 1 2 ::: a b c will result in the combinations (1,a) (1,b) (1,c) (2,a) (2,b) (2,c)
# The delimiter :::+ (note the extra '+') links the argument to the previous argument, and one argument from each of the input
# sources will be read.
parallel_cmd=("parallel" "--jobs" "80%" "--verbose" "--memfree" "2G"
"--tmpdir" "\\$meta_temp_dir"
"--retry-failed" "--retries" "4" "--halt" "soon,fail=1"
"--joblog" "\\$par_joblog" "_run" "{}")
# Arguments for which there is one value, so these will not create extra jobs
parallel_cmd+=(":::" "\\$par_umiLength" ":::" "\\$par_output" ":::" "\\$par_genomeDir" ":::" "\\$par_limitBAMsortRAM" ":::" "\\$par_runThreadN")
# Argument which in fact will cause extra jobs to be spawned, per job one item from each argument will be selected
# Thus, these argument lists should have the same length.
parallel_cmd+=(":::" "\\${barcodes[@]}" ":::+" "\\${sorted_input_r1[@]}" ":::+" "\\${sorted_input_r2[@]}")
set +eo pipefail
"\\${parallel_cmd[@]}"
exit_code=\\$?
set -eo pipefail
echo "GNU parallel finished!"
# Unload reference
printf "Unloading reference genome. "
STAR --genomeLoad Remove --genomeDir "\\$par_genomeDir"
echo "Done!"
# Exit code from GNU parallel:
# If fail=1 is used, the exit status will be the exit status of the failing job.
echo "Checking exit code"
if ((exit_code>0)); then
# Note that the ending HERE must be indented with TAB characters (not spaces)
# in order to remove leading indentation
MESSAGE=\\$(
cat <<-HERE
==================================================================
!!! An error occurred for one of the jobs.
Exit code of the failing job: \\$exit_code.
%s
==================================================================
HERE
)
printf "\\$MESSAGE" "\\$(<\\$par_joblog)"
exit 1
else
cat <<-HERE
==================================================================
Mapping went fine (exit code '\\$exit_code'), zero errors occurred
==================================================================
HERE
fi
VIASHMAIN
bash "$tempscript"
'''
return vdsl3WorkflowFactory(args, meta, rawScript)
}
/**
* Generate a workflow for VDSL3 modules.
*
* This function is called by the workflowFactory() function.
*
* Input channel: [id, input_map]
* Output channel: [id, output_map]
*
* Internally, this workflow will convert the input channel
* to a format which the Nextflow module will be able to handle.
*/
def vdsl3WorkflowFactory(Map args, Map meta, String rawScript) {
def key = args["key"]
def processObj = null
workflow processWf {
take: input_
main:
if (processObj == null) {
processObj = _vdsl3ProcessFactory(args, meta, rawScript)
}
output_ = input_
| map { tuple ->
def id = tuple[0]
def data_ = tuple[1]
if (workflow.stubRun) {
// add id if missing
data_ = [id: 'stub'] + data_
}
// process input files separately
def inputPaths = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "input" }
.collect { par ->
def val = data_.containsKey(par.plainName) ? data_[par.plainName] : []
def inputFiles = []
if (val == null) {
inputFiles = []
} else if (val instanceof List) {
inputFiles = val
} else if (val instanceof Path) {
inputFiles = [ val ]
} else {
inputFiles = []
}
if (!workflow.stubRun) {
// throw error when an input file doesn't exist
inputFiles.each{ file ->
assert file.exists() :
"Error in module '${key}' id '${id}' argument '${par.plainName}'.\n" +
" Required input file does not exist.\n" +
" Path: '$file'.\n" +
" Expected input file to exist"
}
}
inputFiles
}
// remove input files
def argsExclInputFiles = meta.config.allArguments
.findAll { (it.type != "file" || it.direction != "input") && data_.containsKey(it.plainName) }
.collectEntries { par ->
def parName = par.plainName
def val = data_[parName]
if (par.multiple && val instanceof Collection) {
val = val.join(par.multiple_sep)
}
if (par.direction == "output" && par.type == "file") {
val = val
.replaceAll('\\$id', id)
.replaceAll('\\$\\{id\\}', id)
.replaceAll('\\$key', key)
.replaceAll('\\$\\{key\\}', key)
}
[parName, val]
}
[ id ] + inputPaths + [ argsExclInputFiles, meta.resources_dir ]
}
| processObj
| map { output ->
def outputFiles = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.indexed()
.collectEntries{ index, par ->
def out = output[index + 1]
// strip dummy '.exitcode' file from output (see nextflow-io/nextflow#2678)
if (!out instanceof List || out.size() <= 1) {
if (par.multiple) {
out = []
} else {
assert !par.required :
"Error in module '${key}' id '${output[0]}' argument '${par.plainName}'.\n" +
" Required output file is missing"
out = null
}
} else if (out.size() == 2 && !par.multiple) {
out = out[1]
} else {
out = out.drop(1)
}
[ par.plainName, out ]
}
// drop null outputs
outputFiles.removeAll{it.value == null}
[ output[0], outputFiles ]
}
emit: output_
}
return processWf
}
// depends on: session?
def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) {
// autodetect process key
def wfKey = workflowArgs["key"]
def procKeyPrefix = "${wfKey}_process"
def scriptMeta = nextflow.script.ScriptMeta.current()
def existing = scriptMeta.getProcessNames().findAll{it.startsWith(procKeyPrefix)}
def numbers = existing.collect{it.replace(procKeyPrefix, "0").toInteger()}
def newNumber = (numbers + [-1]).max() + 1
def procKey = newNumber == 0 ? procKeyPrefix : "$procKeyPrefix$newNumber"
if (newNumber > 0) {
log.warn "Key for module '${wfKey}' is duplicated.\n",
"If you run a component multiple times in the same workflow,\n" +
"it's recommended you set a unique key for every call,\n" +
"for example: ${wfKey}.run(key: \"foo\")."
}
// subset directives and convert to list of tuples
def drctv = workflowArgs.directives
// TODO: unit test the two commands below
// convert publish array into tags
def valueToStr = { val ->
// ignore closures
if (val instanceof CharSequence) {
if (!val.matches('^[{].*[}]$')) {
'"' + val + '"'
} else {
val
}
} else if (val instanceof List) {
"[" + val.collect{valueToStr(it)}.join(", ") + "]"
} else if (val instanceof Map) {
"[" + val.collect{k, v -> k + ": " + valueToStr(v)}.join(", ") + "]"
} else {
val.inspect()
}
}
// multiple entries allowed: label, publishdir
def drctvStrs = drctv.collect { key, value ->
if (key in ["label", "publishDir"]) {
value.collect{ val ->
if (val instanceof Map) {
"\n$key " + val.collect{ k, v -> k + ": " + valueToStr(v) }.join(", ")
} else if (val == null) {
""
} else {
"\n$key " + valueToStr(val)
}
}.join()
} else if (value instanceof Map) {
"\n$key " + value.collect{ k, v -> k + ": " + valueToStr(v) }.join(", ")
} else {
"\n$key " + valueToStr(value)
}
}.join()
def inputPaths = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "input" }
.collect { ', path(viash_par_' + it.plainName + ', stageAs: "_viash_par/' + it.plainName + '_?/*")' }
.join()
def outputPaths = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.collect { par ->
// insert dummy into every output (see nextflow-io/nextflow#2678)
if (!par.multiple) {
', path{[".exitcode", args.' + par.plainName + ']}'
} else {
', path{[".exitcode"] + args.' + par.plainName + '}'
}
}
.join()
// TODO: move this functionality somewhere else?
if (workflowArgs.auto.transcript) {
outputPaths = outputPaths + ', path{[".exitcode", ".command*"]}'
} else {
outputPaths = outputPaths + ', path{[".exitcode"]}'
}
// create dirs for output files (based on BashWrapper.createParentFiles)
def createParentStr = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "output" && it.create_parent }
.collect { par ->
def contents = "args[\"${par.plainName}\"] instanceof List ? args[\"${par.plainName}\"].join('\" \"') : args[\"${par.plainName}\"]"
"\${ args.containsKey(\"${par.plainName}\") ? \"mkdir_parent '\" + escapeText(${contents}) + \"'\" : \"\" }"
}
.join("\n")
// construct inputFileExports
def inputFileExports = meta.config.allArguments
.findAll { it.type == "file" && it.direction.toLowerCase() == "input" }
.collect { par ->
def contents = "viash_par_${par.plainName} instanceof List ? viash_par_${par.plainName}.join(\"${par.multiple_sep}\") : viash_par_${par.plainName}"
"\n\${viash_par_${par.plainName}.empty ? \"\" : \"export VIASH_PAR_${par.plainName.toUpperCase()}='\" + escapeText(${contents}) + \"'\"}"
}
// NOTE: if using docker, use /tmp instead of tmpDir!
def tmpDir = java.nio.file.Paths.get(
System.getenv('NXF_TEMP') ?:
System.getenv('VIASH_TEMP') ?:
System.getenv('VIASH_TMPDIR') ?:
System.getenv('VIASH_TEMPDIR') ?:
System.getenv('VIASH_TMP') ?:
System.getenv('TEMP') ?:
System.getenv('TMPDIR') ?:
System.getenv('TEMPDIR') ?:
System.getenv('TMP') ?:
'/tmp'
).toAbsolutePath()
// construct stub
def stub = meta.config.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.collect { par ->
"\${ args.containsKey(\"${par.plainName}\") ? \"touch2 \\\"\" + (args[\"${par.plainName}\"] instanceof String ? args[\"${par.plainName}\"].replace(\"_*\", \"_0\") : args[\"${par.plainName}\"].join('\" \"')) + \"\\\"\" : \"\" }"
}
.join("\n")
// escape script
def escapedScript = rawScript.replace('\\', '\\\\').replace('$', '\\$').replace('"""', '\\"\\"\\"')
// publishdir assert
def assertStr = (workflowArgs.auto.publish == true) || workflowArgs.auto.transcript ?
"""\nassert task.publishDir.size() > 0: "if auto.publish is true, params.publish_dir needs to be defined.\\n Example: --publish_dir './output/'" """ :
""
// generate process string
def procStr =
"""nextflow.enable.dsl=2
|
|def escapeText = { s -> s.toString().replaceAll("'", "'\\\"'\\\"'") }
|process $procKey {$drctvStrs
|input:
| tuple val(id)$inputPaths, val(args), path(resourcesDir, stageAs: ".viash_meta_resources")
|output:
| tuple val("\$id")$outputPaths, optional: true
|stub:
|\"\"\"
|touch2() { mkdir -p "\\\$(dirname "\\\$1")" && touch "\\\$1" ; }
|$stub
|\"\"\"
|script:$assertStr
|def parInject = args
| .findAll{key, value -> value != null}
| .collect{key, value -> "export VIASH_PAR_\${key.toUpperCase()}='\${escapeText(value)}'"}
| .join("\\n")
|\"\"\"
|# meta exports
|export VIASH_META_RESOURCES_DIR="\${resourcesDir}"
|export VIASH_META_TEMP_DIR="${['docker', 'podman', 'charliecloud'].any{ it == workflow.containerEngine } ? '/tmp' : tmpDir}"
|export VIASH_META_NAME="${meta.config.name}"
|# export VIASH_META_EXECUTABLE="\\\$VIASH_META_RESOURCES_DIR/\\\$VIASH_META_NAME"
|export VIASH_META_CONFIG="\\\$VIASH_META_RESOURCES_DIR/.config.vsh.yaml"
|\${task.cpus ? "export VIASH_META_CPUS=\$task.cpus" : "" }
|\${task.memory?.bytes != null ? "export VIASH_META_MEMORY_B=\$task.memory.bytes" : "" }
|if [ ! -z \\\${VIASH_META_MEMORY_B+x} ]; then
| export VIASH_META_MEMORY_KB=\\\$(( (\\\$VIASH_META_MEMORY_B+999) / 1000 ))
| export VIASH_META_MEMORY_MB=\\\$(( (\\\$VIASH_META_MEMORY_KB+999) / 1000 ))
| export VIASH_META_MEMORY_GB=\\\$(( (\\\$VIASH_META_MEMORY_MB+999) / 1000 ))
| export VIASH_META_MEMORY_TB=\\\$(( (\\\$VIASH_META_MEMORY_GB+999) / 1000 ))
| export VIASH_META_MEMORY_PB=\\\$(( (\\\$VIASH_META_MEMORY_TB+999) / 1000 ))
| export VIASH_META_MEMORY_KIB=\\\$(( (\\\$VIASH_META_MEMORY_B+1023) / 1024 ))
| export VIASH_META_MEMORY_MIB=\\\$(( (\\\$VIASH_META_MEMORY_KIB+1023) / 1024 ))
| export VIASH_META_MEMORY_GIB=\\\$(( (\\\$VIASH_META_MEMORY_MIB+1023) / 1024 ))
| export VIASH_META_MEMORY_TIB=\\\$(( (\\\$VIASH_META_MEMORY_GIB+1023) / 1024 ))
| export VIASH_META_MEMORY_PIB=\\\$(( (\\\$VIASH_META_MEMORY_TIB+1023) / 1024 ))
|fi
|
|# meta synonyms
|export VIASH_TEMP="\\\$VIASH_META_TEMP_DIR"
|export TEMP_DIR="\\\$VIASH_META_TEMP_DIR"
|
|# create output dirs if need be
|function mkdir_parent {
| for file in "\\\$@"; do
| mkdir -p "\\\$(dirname "\\\$file")"
| done
|}
|$createParentStr
|
|# argument exports${inputFileExports.join()}
|\$parInject
|
|# process script
|${escapedScript}
|\"\"\"
|}
|""".stripMargin()
// TODO: print on debug
// if (workflowArgs.debug == true) {
// println("######################\n$procStr\n######################")
// }
// write process to temp file
def tempFile = java.nio.file.Files.createTempFile("viash-process-${procKey}-", ".nf")
addShutdownHook { java.nio.file.Files.deleteIfExists(tempFile) }
tempFile.text = procStr
// create process from temp file
def binding = new nextflow.script.ScriptBinding([:])
def session = nextflow.Nextflow.getSession()
def parser = _getScriptLoader(session)
.setModule(true)
.setBinding(binding)
def moduleScript = parser.runScript(tempFile)
.getScript()
// register module in meta
def module = new nextflow.script.IncludeDef.Module(name: procKey)
scriptMeta.addModule(moduleScript, module.name, module.alias)
// retrieve and return process from meta
return scriptMeta.getProcess(procKey)
}
// use Reflection to get a ScriptParser / ScriptLoader
// <25.02.0-edge: new nextflow.script.ScriptParser(session)
// >=25.02.0-edge: nextflow.script.ScriptLoaderFactory.create(session)
def _getScriptLoader(nextflow.Session session) {
// try using the old method
try {
Class<?> scriptParserClass = Class.forName('nextflow.script.ScriptParser')
return scriptParserClass.getDeclaredConstructor(nextflow.Session).newInstance(session)
} catch (ClassNotFoundException e) {
// else try with the new method
try {
Class<?> scriptLoaderFactoryClass = Class.forName('nextflow.script.ScriptLoaderFactory')
def createMethod = scriptLoaderFactoryClass.getDeclaredMethod('create', nextflow.Session)
return createMethod.invoke(null, session) // null because create is static
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | java.lang.reflect.InvocationTargetException e2) {
// Handle the case where neither class is found
throw new Exception("Neither nextflow.script.ScriptParser nor nextflow.script.ScriptLoaderFactory could be found. Is this a compatible Nextflow version?", e2)
}
}
}
// defaults
meta["defaults"] = [
// key to be used to trace the process and determine output names
key: null,
// fixed arguments to be passed to script
args: [:],
// default directives
directives: readJsonBlob('''{
"container" : {
"registry" : "images.viash-hub.com",
"image" : "vsh/htrnaseq/parallel_map",
"tag" : "main"
},
"tag" : "$id"
}'''),
// auto settings
auto: readJsonBlob('''{
"simplifyInput" : true,
"simplifyOutput" : false,
"transcript" : false,
"publish" : false
}'''),
// Apply a map over the incoming tuple
// Example: `{ tup -> [ tup[0], [input: tup[1].output] ] + tup.drop(2) }`
map: null,
// Apply a map over the ID element of a tuple (i.e. the first element)
// Example: `{ id -> id + "_foo" }`
mapId: null,
// Apply a map over the data element of a tuple (i.e. the second element)
// Example: `{ data -> [ input: data.output ] }`
mapData: null,
// Apply a map over the passthrough elements of a tuple (i.e. the tuple excl. the first two elements)
// Example: `{ pt -> pt.drop(1) }`
mapPassthrough: null,
// Filter the channel
// Example: `{ tup -> tup[0] == "foo" }`
filter: null,
// Choose whether or not to run the component on the tuple if the condition is true.
// Otherwise, the tuple will be passed through.
// Example: `{ tup -> tup[0] != "skip_this" }`
runIf: null,
// Rename keys in the data field of the tuple (i.e. the second element)
// Will likely be deprecated in favour of `fromState`.
// Example: `[ "new_key": "old_key" ]`
renameKeys: null,
// Fetch data from the state and pass it to the module without altering the current state.
//
// `fromState` should be `null`, `List[String]`, `Map[String, String]` or a function.
//
// - If it is `null`, the state will be passed to the module as is.
// - If it is a `List[String]`, the data will be the values of the state at the given keys.
// - If it is a `Map[String, String]`, the data will be the values of the state at the given keys, with the keys renamed according to the map.
// - If it is a function, the tuple (`[id, state]`) in the channel will be passed to the function, and the result will be used as the data.
//
// Example: `{ id, state -> [input: state.fastq_file] }`
// Default: `null`
fromState: null,
// Determine how the state should be updated after the module has been run.
//
// `toState` should be `null`, `List[String]`, `Map[String, String]` or a function.
//
// - If it is `null`, the state will be replaced with the output of the module.
// - If it is a `List[String]`, the state will be updated with the values of the data at the given keys.
// - If it is a `Map[String, String]`, the state will be updated with the values of the data at the given keys, with the keys renamed according to the map.
// - If it is a function, a tuple (`[id, output, state]`) will be passed to the function, and the result will be used as the new state.
//
// Example: `{ id, output, state -> state + [counts: state.output] }`
// Default: `{ id, output, state -> output }`
toState: null,
// Whether or not to print debug messages
// Default: `false`
debug: false
]
// initialise default workflow
meta["workflow"] = workflowFactory([key: meta.config.name], meta.defaults, meta)
// add workflow to environment
nextflow.script.ScriptMeta.current().addDefinition(meta.workflow)
// anonymous workflow for running this module as a standalone
workflow {
// add id argument if it's not already in the config
// TODO: deep copy
def newConfig = deepClone(meta.config)
def newParams = deepClone(params)
def argsContainsId = newConfig.allArguments.any{it.plainName == "id"}
if (!argsContainsId) {
def idArg = [
'name': '--id',
'required': false,
'type': 'string',
'description': 'A unique id for every entry.',
'multiple': false
]
newConfig.arguments.add(0, idArg)
newConfig = processConfig(newConfig)
}
if (!newParams.containsKey("id")) {
newParams.id = "run"
}
helpMessage(newConfig)
channelFromParams(newParams, newConfig)
// make sure id is not in the state if id is not in the args
| map {id, state ->
if (!argsContainsId) {
[id, state.findAll{k, v -> k != "id"}]
} else {
[id, state]
}
}
| meta.workflow.run(
auto: [ publish: "state" ]
)
}
// END COMPONENT-SPECIFIC CODE