Build branch main with version main (65dd41d)

Build pipeline: viash-hub.htrnaseq.main-vhms8

Source commit: 65dd41d8b1

Source message: Optimize spawning of processes
This commit is contained in:
CI
2024-11-05 17:26:35 +00:00
parent b8abf8c490
commit f2ff92c6ac
127 changed files with 29560 additions and 1004 deletions

View File

@@ -6,23 +6,27 @@ argument_groups:
arguments:
- type: "file"
name: "--input_r1"
description: "R1"
description: "Forward reads in FASTQ format. Multiple files can be provided which\
\ will\nbe demultiplexed separately before joining the results for each individual\
\ well.\n"
info: null
must_exist: true
create_parent: true
required: true
direction: "input"
multiple: false
multiple: true
multiple_sep: ";"
- type: "file"
name: "--input_r2"
description: "R2"
description: "Reverse reads in FASTQ format. Multiple files can be provided which\
\ will\nbe demultiplexed separately before joining the results for each individual\
\ well.\n"
info: null
must_exist: true
create_parent: true
required: true
direction: "input"
multiple: false
multiple: true
multiple_sep: ";"
- type: "file"
name: "--barcodesFasta"
@@ -42,6 +46,15 @@ argument_groups:
direction: "input"
multiple: false
multiple_sep: ";"
- type: "file"
name: "--annotation"
info: null
must_exist: true
create_parent: true
required: true
direction: "input"
multiple: false
multiple_sep: ";"
- name: "Output arguments"
arguments:
- type: "file"
@@ -84,7 +97,51 @@ argument_groups:
name: "--nrReadsNrGenesPerChrom"
info: null
default:
- "nrReadsNrGenesPerChrom.txt"
- "nrReadsNrGenesPerChrom.$id.txt"
must_exist: true
create_parent: true
required: true
direction: "output"
multiple: false
multiple_sep: ";"
- type: "file"
name: "--star_qc_metrics"
info: null
default:
- "starLogs.$id.txt"
must_exist: true
create_parent: true
required: true
direction: "output"
multiple: false
multiple_sep: ";"
- type: "file"
name: "--eset"
info: null
default:
- "eset.$id.rds"
must_exist: true
create_parent: true
required: true
direction: "output"
multiple: false
multiple_sep: ";"
- type: "file"
name: "--f_data"
info: null
default:
- "fData.$id.tsv"
must_exist: true
create_parent: true
required: true
direction: "output"
multiple: false
multiple_sep: ";"
- type: "file"
name: "--p_data"
info: null
default:
- "pData.$id.tsv"
must_exist: true
create_parent: true
required: true
@@ -99,12 +156,20 @@ resources:
- type: "file"
path: "nextflow_labels.config"
dest: "nextflow_labels.config"
test_resources:
- type: "nextflow_script"
path: "test.nf"
is_executable: true
entrypoint: "test_wf"
info: null
status: "enabled"
requirements:
commands:
- "ps"
dependencies:
- name: "stats/combine_star_logs"
repository:
type: "local"
- name: "stats/generate_pool_statistics"
repository:
type: "local"
@@ -120,11 +185,15 @@ dependencies:
- name: "workflows/utils/groupWells"
repository:
type: "local"
- name: "concat_text"
- name: "eset/create_eset"
repository:
type: "vsh"
repo: "craftbox"
tag: "concat_text"
type: "local"
- name: "eset/create_fdata"
repository:
type: "local"
- name: "eset/create_pdata"
repository:
type: "local"
repositories:
- type: "local"
name: "local"
@@ -132,10 +201,6 @@ repositories:
name: "bb"
repo: "biobox"
tag: "v0.1.0"
- type: "vsh"
name: "cb"
repo: "craftbox"
tag: "concat_text"
license: "MIT"
links:
repository: "https://github.com/viash-hub/htrnaseq"
@@ -214,22 +279,28 @@ build_info:
engine: "native|native"
output: "target/nextflow/workflows/htrnaseq"
executable: "target/nextflow/workflows/htrnaseq/main.nf"
viash_version: "0.9.0-RC7"
git_commit: "cf9797232db1306bfd5696287928cababe317d99"
git_remote: "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
viash_version: "0.9.0"
git_commit: "65dd41d8b1b4a307735c72320c96c0880c75f17f"
git_remote: "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
dependencies:
- "target/nextflow/stats/combine_star_logs"
- "target/nextflow/stats/generate_pool_statistics"
- "target/nextflow/stats/generate_well_statistics"
- "target/nextflow/workflows/well_demultiplex"
- "target/nextflow/workflows/parallel_map_wf"
- "target/nextflow/workflows/utils/groupWells"
- "target/dependencies/vsh/vsh/craftbox/concat_text/nextflow/concat_text"
- "target/nextflow/eset/create_eset"
- "target/nextflow/eset/create_fdata"
- "target/nextflow/eset/create_pdata"
package_config:
name: "htrnaseq"
version: "main"
description: "High-throughput pipeline [WIP]\n"
info: null
viash_version: "0.9.0-RC7"
info:
test_resources:
- path: "gs://viash-hub-test-data/htrnaseq/v1/"
dest: "resources_test"
viash_version: "0.9.0"
source: "src"
target: "target"
config_mods:

View File

@@ -1,8 +1,8 @@
// htrnaseq main
//
// This wrapper script is auto-generated by viash 0.9.0-RC7 and is thus a
// derivative work thereof. This software comes with ABSOLUTELY NO WARRANTY from
// Data Intuitive.
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
// Intuitive.
//
// The component may contain files which fall under a different license. The
// authors of this component should specify the license in the header of such
@@ -1728,7 +1728,9 @@ def publishStates(Map args) {
def yamlFilename = yamlTemplate_
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
// TODO: do the pathnames in state_ match up with the outputFilenames_?
@@ -1799,7 +1801,9 @@ def publishStatesByConfig(Map args) {
def yamlTemplate = params.containsKey("output_state") ? params.output_state : '$id.$key.state.yaml'
def yamlFilename = yamlTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
@@ -1841,7 +1845,9 @@ def publishStatesByConfig(Map args) {
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
@@ -2808,23 +2814,23 @@ meta = [
{
"type" : "file",
"name" : "--input_r1",
"description" : "R1",
"description" : "Forward reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple" : true,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--input_r2",
"description" : "R2",
"description" : "Reverse reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple" : true,
"multiple_sep" : ";"
},
{
@@ -2846,6 +2852,16 @@ meta = [
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--annotation",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple_sep" : ";"
}
]
},
@@ -2898,7 +2914,59 @@ meta = [
"type" : "file",
"name" : "--nrReadsNrGenesPerChrom",
"default" : [
"nrReadsNrGenesPerChrom.txt"
"nrReadsNrGenesPerChrom.$id.txt"
],
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "output",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--star_qc_metrics",
"default" : [
"starLogs.$id.txt"
],
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "output",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--eset",
"default" : [
"eset.$id.rds"
],
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "output",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--f_data",
"default" : [
"fData.$id.tsv"
],
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "output",
"multiple" : false,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--p_data",
"default" : [
"pData.$id.tsv"
],
"must_exist" : true,
"create_parent" : true,
@@ -2923,6 +2991,14 @@ meta = [
"dest" : "nextflow_labels.config"
}
],
"test_resources" : [
{
"type" : "nextflow_script",
"path" : "test.nf",
"is_executable" : true,
"entrypoint" : "test_wf"
}
],
"status" : "enabled",
"requirements" : {
"commands" : [
@@ -2930,6 +3006,12 @@ meta = [
]
},
"dependencies" : [
{
"name" : "stats/combine_star_logs",
"repository" : {
"type" : "local"
}
},
{
"name" : "stats/generate_pool_statistics",
"repository" : {
@@ -2961,11 +3043,21 @@ meta = [
}
},
{
"name" : "concat_text",
"name" : "eset/create_eset",
"repository" : {
"type" : "vsh",
"repo" : "craftbox",
"tag" : "concat_text"
"type" : "local"
}
},
{
"name" : "eset/create_fdata",
"repository" : {
"type" : "local"
}
},
{
"name" : "eset/create_pdata",
"repository" : {
"type" : "local"
}
}
],
@@ -2979,12 +3071,6 @@ meta = [
"name" : "bb",
"repo" : "biobox",
"tag" : "v0.1.0"
},
{
"type" : "vsh",
"name" : "cb",
"repo" : "craftbox",
"tag" : "concat_text"
}
],
"license" : "MIT",
@@ -3078,15 +3164,23 @@ meta = [
"runner" : "nextflow",
"engine" : "native|native",
"output" : "target/nextflow/workflows/htrnaseq",
"viash_version" : "0.9.0-RC7",
"git_commit" : "cf9797232db1306bfd5696287928cababe317d99",
"git_remote" : "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
"viash_version" : "0.9.0",
"git_commit" : "65dd41d8b1b4a307735c72320c96c0880c75f17f",
"git_remote" : "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
},
"package_config" : {
"name" : "htrnaseq",
"version" : "main",
"description" : "High-throughput pipeline [WIP]\n",
"viash_version" : "0.9.0-RC7",
"info" : {
"test_resources" : [
{
"path" : "gs://viash-hub-test-data/htrnaseq/v1/",
"dest" : "resources_test"
}
]
},
"viash_version" : "0.9.0",
"source" : "src",
"target" : "target",
"config_mods" : [
@@ -3115,12 +3209,15 @@ meta = [
// resolve dependencies dependencies (if any)
meta["root_dir"] = getRootDir()
include { combine_star_logs } from "${meta.resources_dir}/../../../nextflow/stats/combine_star_logs/main.nf"
include { generate_pool_statistics } from "${meta.resources_dir}/../../../nextflow/stats/generate_pool_statistics/main.nf"
include { generate_well_statistics } from "${meta.resources_dir}/../../../nextflow/stats/generate_well_statistics/main.nf"
include { well_demultiplex } from "${meta.resources_dir}/../../../nextflow/workflows/well_demultiplex/main.nf"
include { parallel_map_wf } from "${meta.resources_dir}/../../../nextflow/workflows/parallel_map_wf/main.nf"
include { groupWells } from "${meta.resources_dir}/../../../nextflow/workflows/utils/groupWells/main.nf"
include { concat_text } from "${meta.root_dir}/dependencies/vsh/vsh/craftbox/concat_text/nextflow/concat_text/main.nf"
include { create_eset } from "${meta.resources_dir}/../../../nextflow/eset/create_eset/main.nf"
include { create_fdata } from "${meta.resources_dir}/../../../nextflow/eset/create_fdata/main.nf"
include { create_pdata } from "${meta.resources_dir}/../../../nextflow/eset/create_pdata/main.nf"
// inner workflow
// user-provided Nextflow code
@@ -3129,72 +3226,68 @@ workflow run_wf {
input_ch
main:
output_ch = input_ch
| well_demultiplex.run(
fromState: { id, state ->
[
input_r1: state.input_r1,
input_r2: state.input_r2,
barcodesFasta: state.barcodesFasta,
]
},
toState: { id, result, state ->
state + result + [
fastq_output_r1: result.output_r1,
fastq_output_r2: result.output_r2,
input_r1: result.output_r1,
input_r2: result.output_r2,
]
},
directives: [label: ["midmem", "midcpu"]]
// The featureData only has one requirement: the genome annotation.
// It can be generated straight away.
f_data_ch = input_ch
| create_fdata.run(
directives: [label: ["lowmem", "lowcpu"]],
fromState: ["gtf": "annotation"],
toState: {id, result, state -> ["f_data": result.output]}
)
// TODO: Expand this into matching a whitelist/blacklist of barcodes
// ... and turn into separate component
| filter{ id, state -> state.barcode != "unknown" }
| concat_text.run(
key: "concat_txt_r1",
runIf: {id, state -> state.input_r1.size() > 1},
fromState: { id, state ->
[
input: state.input_r1,
gzip_output: true,
]
},
// Perform mapping of each well. The input here are events per pool,
// the output channel is one event per well.
mapping_ch = input_ch
| map {id, state ->
def n_barcodes = state.barcodesFasta.countFasta() as int
def newState = state + ["n_barcodes": n_barcodes]
// The header is the full header, the id is the part header up to the first whitespace character
// We do not allow whitespace in the header of the fasta file, so assert this.
def fasta_entries = state.barcodesFasta.splitFasta(record: ["id": true, "header": true, "seqString": true])
assert fasta_entries.every{it.id == it.header}, "The barcodes FASTA headers must not contain any whitespace!"
// Check if the fasta headers are unique
def fasta_ids = fasta_entries.collect{it.id}
assert fasta_ids.clone().unique() == fasta_ids, "The barcodes FASTA entries must have a unique name!"
// Check if the sequences are unique
def fasta_sequences = fasta_entries.collect{it.seqString}
assert fasta_sequences.clone().unique() == fasta_sequences, "The barcodes FASTA sequences must be unique!"
[id, newState]
}
| well_demultiplex.run(
fromState: [
"input_r1": "input_r1",
"input_r2": "input_r2",
"barcodesFasta": "barcodesFasta",
],
toState: { id, result, state ->
state + [ input_r1: [ result.output ] ]
}
)
| concat_text.run(
key: "concat_text_r2",
runIf: {id, state -> state.input_r2.size() > 1},
fromState: { id, state ->
[
input: state.input_r2,
gzip_output: true
def filtered_input = state.findAll{!["input_r1", "input_r2"].contains(it.key)}
def filtered_results = result.findAll{!["output_r1", "output_r2"].contains(it.key)}
def new_state = filtered_input + filtered_results + [
"fastq_output_r1": result.output_r1,
"fastq_output_r2": result.output_r2,
]
},
toState: { id, result, state ->
state + [ input_r2: [ result.output ] ]
return new_state
}
)
| parallel_map_wf.run(
fromState: {id, state ->
def star_output = state.star_output[0]
[
"input_r1": state.input_r1[0],
"input_r2": state.input_r2[0],
"input_r1": state.fastq_output_r1[0],
"input_r2": state.fastq_output_r2[0],
"barcode": state.barcode,
"pool": state.pool,
"output": state.star_output[0],
"genomeDir": state.genomeDir,
]
},
toState: {id, result, state ->
state + ["star_output": result.output]
},
toState: ["star_output": "output"]
)
// From the mapped wells, create statistics based on the BAM file
// and join the events back to pool level.
pool_ch = mapping_ch
| generate_well_statistics.run(
directives: [label: ["verylowmem", "verylowcpu"]],
fromState: { id, state ->
[
"input": state.star_output.resolve('Aligned.sortedByCoord.out.bam'),
@@ -3203,39 +3296,132 @@ workflow run_wf {
},
toState: [
"nrReadsNrGenesPerChrom": "nrReadsNrGenesPerChrom",
"nrReadsNrUMIsPerCB": "nrReadsNrUMIsPerCB",
]
)
| map {id, state ->
[state.pool, id, state]
| map {id, state ->
// Create a special groupKey, such that groupTuple
// knows when all the barcodes have been grouped into 1 event.
// This way the processing is as distributed as possible.
def key = groupKey(state.pool, state.n_barcodes)
def newEvent = [key, state]
return newEvent
}
| groupTuple(by: 0, sort: "hash")
| map {id, well_ids, states ->
def collected_state = [
// Use a custom sorting function because sort: 'hash'
// requires a hash to be calculated on every entry of the state
// This is inefficient when the number of events is large
// (i.e large number or barcodes).
// Sorting on lexographical order of the barcode is sufficient here.
| groupTuple(sort: {a, b -> a.barcode <=> b.barcode})
| map {id, states ->
// Gather the keys from all states. for some state items,
// we need gather all the different items from across the states
def barcodes = states.collect{it.barcode}
assert barcodes.clone().unique().size() == barcodes.size(), \
"Error when gathering information for pool ${id}, barcodes are not unique!"
def custom_state = [
"fastq_output_r1": states.collect{it.fastq_output_r1[0]},
"fastq_output_r2": states.collect{it.fastq_output_r2[0]},
"barcode": barcodes,
"star_output": states.collect{it.star_output},
"nrReadsNrGenesPerChrom": states.collect{it.nrReadsNrGenesPerChrom},
]
def newState = states[0] + collected_state
[id, newState]
//For many state items, the value is the same across states.
def other_state_keys = states.inject([].toSet()){ current_keys, state ->
def new_keys = current_keys + state.keySet()
return new_keys
}.minus(custom_state.keySet())
// All other state should have a unique value
def old_state_items = other_state_keys.inject([:]){ old_state, argument_name ->
argument_values = states.collect{it.get(argument_name)}.unique()
assert argument_values.size() == 1, "Arguments should be the same across modalities. Please report this \
as a bug. Argument name: $argument_name, \
argument value: $argument_values"
def argument_value
argument_values.each { argument_value = it }
def current_state = old_state + [(argument_name): argument_value]
return current_state
}
def new_state = custom_state + old_state_items
[id.getGroupTarget(), new_state]
}
// The well statistics are merged on pool level.
pool_statistics_ch = pool_ch
| generate_pool_statistics.run(
directives: ["label": ["lowmem", "verylowcpu"]],
fromState: [
"nrReadsNrGenesPerChrom": "nrReadsNrGenesPerChrom",
],
toState: {id, result, state ->
state + ["nrReadsNrGenesPerChrom": result.nrReadsNrGenesPerChromPool]
}
toState: [
"nrReadsNrGenesPerChromPool": "nrReadsNrGenesPerChromPool"
]
)
// The statistics from the STAR logs of different wells are joined
// on pool level
star_logs_ch = pool_ch
| combine_star_logs.run(
directives: ["label": ["lowmem", "verylowcpu"]],
fromState: {id, state -> [
"star_logs": state.star_output.collect{it.resolve("Log.final.out")},
"gene_summary_logs": state.star_output.collect{it.resolve("Solo.out/Gene/Summary.csv")},
"reads_per_gene_logs": state.star_output.collect{it.resolve("ReadsPerGene.out.tab")},
"barcodes": state.barcode,
"output": state.star_qc_metrics
]
},
toState: [
"star_qc_metrics": "output",
]
)
p_data_ch = star_logs_ch.join(pool_statistics_ch, remainder: true)
| map {id, star_logs_state, pool_statistics_state ->
def newState = star_logs_state + ["nrReadsNrGenesPerChromPool": pool_statistics_state.nrReadsNrGenesPerChromPool]
return [id, newState]
}
| create_pdata.run(
directives: [label: ["lowmem", "lowcpu"]],
fromState: [
"star_stats_file": "star_qc_metrics",
"nrReadsNrGenesPerChromPool": "nrReadsNrGenesPerChromPool",
],
toState: ["p_data": "output"],
)
output_ch = p_data_ch.join(f_data_ch, remainder: true)
| map {id, p_data_state, f_data_state ->
def newState = p_data_state + ["f_data": f_data_state["f_data"]]
[id, newState]
}
| create_eset.run(
directives: [label: ["lowmem", "lowcpu"]],
fromState: [
"pDataFile": "p_data",
"fDataFile": "f_data",
"mappingDir": "star_output",
"output": "eset",
"barcodes": "barcode",
"poolName": "pool",
],
toState: [
"eset": "output",
]
)
| niceView()
| setState([
"star_output",
"fastq_output_r1",
"fastq_output_r2",
"star_output",
"nrReadsNrGenesPerChrom",
"star_output": "star_output",
"fastq_output_r1": "fastq_output_r1",
"fastq_output_r2": "fastq_output_r2",
"star_output": "star_output",
"nrReadsNrGenesPerChrom": "nrReadsNrGenesPerChromPool",
"star_qc_metrics": "star_qc_metrics",
"eset": "eset",
"f_data": "f_data",
"p_data": "p_data"
])
emit:
output_ch
}

View File

@@ -1,26 +1,88 @@
executor {
$k8s {
submitRateLimit = '10sec'
pollInterval = '1 sec'
}
}
process {
// Default resources for components that hardly do any processing
memory = { 2.GB * task.attempt }
cpus = 1
container = 'nextflow/bash:latest'
// default resources
memory = { 8.Gb * task.attempt }
cpus = 8
maxForks = 36
// Retry for exit codes that have something to do with memory issues
errorStrategy = { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries = 3
maxMemory = null
maxMemory = 192.GB
// Resource labels
withLabel: singlecpu { cpus = 1 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 10 }
withLabel: highcpu { cpus = 20 }
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 8 }
withLabel: midcpu { cpus = 16 }
withLabel: highcpu { cpus = 32 }
withLabel: lowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 25.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 50.GB * task.attempt ) } }
withLabel: veryhighmem { memory = { get_memory( 75.GB * task.attempt ) } }
withLabel: verylowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 16.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 64.GB * task.attempt ) } }
}
profiles {
// detect tempdir
tempDir = java.nio.file.Paths.get(
System.getenv('NXF_TEMP') ?:
System.getenv('VIASH_TEMP') ?:
System.getenv('TEMPDIR') ?:
System.getenv('TMPDIR') ?:
'/tmp'
).toAbsolutePath()
mount_temp {
docker.temp = tempDir
podman.temp = tempDir
charliecloud.temp = tempDir
}
no_publish {
process {
withName: '.*' {
publishDir = [
enabled: false
]
}
}
}
docker {
docker.fixOwnership = true
docker.enabled = true
// docker.userEmulation = true
singularity.enabled = false
podman.enabled = false
shifter.enabled = false
charliecloud.enabled = false
}
local {
// This config is for local processing.
process {
maxMemory = 25.GB
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 6 }
withLabel: highcpu { cpus = 12 }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 12.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 20.GB * task.attempt ) } }
}
}
}
def get_memory(to_compare) {
if (!process.containsKey("maxMemory") || !process.maxMemory) {
return to_compare

View File

@@ -17,8 +17,8 @@
"input_r1": {
"type":
"string",
"description": "Type: `file`, required. R1",
"help_text": "Type: `file`, required. R1"
"description": "Type: List of `file`, required, multiple_sep: `\";\"`. Forward reads in FASTQ format",
"help_text": "Type: List of `file`, required, multiple_sep: `\";\"`. Forward reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n"
}
@@ -27,8 +27,8 @@
"input_r2": {
"type":
"string",
"description": "Type: `file`, required. R2",
"help_text": "Type: `file`, required. R2"
"description": "Type: List of `file`, required, multiple_sep: `\";\"`. Reverse reads in FASTQ format",
"help_text": "Type: List of `file`, required, multiple_sep: `\";\"`. Reverse reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n"
}
@@ -53,6 +53,16 @@
}
,
"annotation": {
"type":
"string",
"description": "Type: `file`, required. ",
"help_text": "Type: `file`, required. "
}
}
},
@@ -107,6 +117,50 @@
}
,
"star_qc_metrics": {
"type":
"string",
"description": "Type: `file`, required, default: `$id.$key.star_qc_metrics.txt`. ",
"help_text": "Type: `file`, required, default: `$id.$key.star_qc_metrics.txt`. "
,
"default": "$id.$key.star_qc_metrics.txt"
}
,
"eset": {
"type":
"string",
"description": "Type: `file`, required, default: `$id.$key.eset.rds`. ",
"help_text": "Type: `file`, required, default: `$id.$key.eset.rds`. "
,
"default": "$id.$key.eset.rds"
}
,
"f_data": {
"type":
"string",
"description": "Type: `file`, required, default: `$id.$key.f_data.tsv`. ",
"help_text": "Type: `file`, required, default: `$id.$key.f_data.tsv`. "
,
"default": "$id.$key.f_data.tsv"
}
,
"p_data": {
"type":
"string",
"description": "Type: `file`, required, default: `$id.$key.p_data.tsv`. ",
"help_text": "Type: `file`, required, default: `$id.$key.p_data.tsv`. "
,
"default": "$id.$key.p_data.tsv"
}
}
},

View File

@@ -160,9 +160,9 @@ build_info:
engine: "native|native"
output: "target/nextflow/workflows/parallel_map_wf"
executable: "target/nextflow/workflows/parallel_map_wf/main.nf"
viash_version: "0.9.0-RC7"
git_commit: "cf9797232db1306bfd5696287928cababe317d99"
git_remote: "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
viash_version: "0.9.0"
git_commit: "65dd41d8b1b4a307735c72320c96c0880c75f17f"
git_remote: "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
dependencies:
- "target/nextflow/parallel_map"
- "target/nextflow/workflows/utils/groupWells"
@@ -170,8 +170,11 @@ package_config:
name: "htrnaseq"
version: "main"
description: "High-throughput pipeline [WIP]\n"
info: null
viash_version: "0.9.0-RC7"
info:
test_resources:
- path: "gs://viash-hub-test-data/htrnaseq/v1/"
dest: "resources_test"
viash_version: "0.9.0"
source: "src"
target: "target"
config_mods:

View File

@@ -1,8 +1,8 @@
// parallel_map_wf main
//
// This wrapper script is auto-generated by viash 0.9.0-RC7 and is thus a
// derivative work thereof. This software comes with ABSOLUTELY NO WARRANTY from
// Data Intuitive.
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
// Intuitive.
//
// The component may contain files which fall under a different license. The
// authors of this component should specify the license in the header of such
@@ -1728,7 +1728,9 @@ def publishStates(Map args) {
def yamlFilename = yamlTemplate_
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
// TODO: do the pathnames in state_ match up with the outputFilenames_?
@@ -1799,7 +1801,9 @@ def publishStatesByConfig(Map args) {
def yamlTemplate = params.containsKey("output_state") ? params.output_state : '$id.$key.state.yaml'
def yamlFilename = yamlTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
@@ -1841,7 +1845,9 @@ def publishStatesByConfig(Map args) {
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
@@ -2995,15 +3001,23 @@ meta = [
"runner" : "nextflow",
"engine" : "native|native",
"output" : "target/nextflow/workflows/parallel_map_wf",
"viash_version" : "0.9.0-RC7",
"git_commit" : "cf9797232db1306bfd5696287928cababe317d99",
"git_remote" : "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
"viash_version" : "0.9.0",
"git_commit" : "65dd41d8b1b4a307735c72320c96c0880c75f17f",
"git_remote" : "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
},
"package_config" : {
"name" : "htrnaseq",
"version" : "main",
"description" : "High-throughput pipeline [WIP]\n",
"viash_version" : "0.9.0-RC7",
"info" : {
"test_resources" : [
{
"path" : "gs://viash-hub-test-data/htrnaseq/v1/",
"dest" : "resources_test"
}
]
},
"viash_version" : "0.9.0",
"source" : "src",
"target" : "target",
"config_mods" : [
@@ -3044,21 +3058,17 @@ workflow run_wf {
main:
pool_ch = input_ch
| groupWells.run(
fromState: { id, state ->
[
"input_r1": state.input_r1,
"input_r2": state.input_r2,
"well": state.barcode,
"pool": state.pool,
]
},
toState: { id, result, state ->
state + [
"wells": result.wells,
"input_r1": result.output_r1,
"input_r2": result.output_r2,
]
}
fromState: [
"input_r1": "input_r1",
"input_r2": "input_r2",
"well": "barcode",
"pool": "pool",
],
toState: [
"wells": "wells",
"input_r1": "output_r1",
"input_r2": "output_r2",
]
)
| parallel_map.run(
fromState: { id, state ->
@@ -3073,36 +3083,41 @@ workflow run_wf {
"output": state.output,
]
},
toState: { id, result, state ->
state + [
output: result.output,
]
},
directives: [label: ["midmem", "midcpu"]]
toState: ["output": "output"],
directives: ["label": ["highmem", "lowcpu"]],
)
| setState(["output"])
| setState(["output", "pool"])
// input_ch is on pool level, while parallel_map
// outputs multiple events per pool.
// Join the results back to pool level
input_join_ch = input_ch
| map {id, state ->
[state.pool, id, state]
def newEvent = [state.pool, id, state]
return newEvent
}
output_ch = input_join_ch.combine(pool_ch, by: 0)
| map {pool, well_id, state_well, state_pool ->
well_output = state_pool.output.findAll{star_output_dir ->
def well_output = state_pool.output.findAll{star_output_dir ->
def barcodes_list = []
def barcode_file_regex = ~/.*\/raw\/barcodes\.tsv$/
star_output_dir.eachFileRecurse{barcode_file ->
if (barcode_file =~ barcode_file_regex) {
assert barcode_file.countLines() == 1, "Expected only one barcode in a single STAR output."
barcodes_list.add(barcode_file.text.trim())
}
}
assert barcodes_list.size() == 1, "Exactly one file should have matched the barcodes file regex (found: $barcodes_list)."
// Get the barcode from the STAR file.
// One STAR output contains the results for one
// well barcode. We can look for the barcode in
// the 'Solo.out/Gene/raw/barcode.tsv' file.
def barcodes_files = files("${star_output_dir}/Solo.out/Gene/raw/barcodes.tsv")
assert barcodes_files.size() == 1, \
"Exactly one file should have matched the barcodes files (found: $barcodes_files)."
def barcode
barcodes_list.each{ it -> barcode = it }
barcodes_files.each{ it ->
assert it.countLines() == 1,
"Expected only one barcode in a single STAR output."
barcode = it.text.trim()
}
return barcode == state_well.barcode
}
assert well_output.size() == 1, "Two or more outputs from the mapping seemed to have processed barcode '$barcode'."
assert well_output.size() == 1, \
"Two or more outputs from the mapping seemed to have processed barcode '$barcode'."
[well_id, ["output": well_output[0]]]
}

View File

@@ -1,26 +1,88 @@
executor {
$k8s {
submitRateLimit = '10sec'
pollInterval = '1 sec'
}
}
process {
// Default resources for components that hardly do any processing
memory = { 2.GB * task.attempt }
cpus = 1
container = 'nextflow/bash:latest'
// default resources
memory = { 8.Gb * task.attempt }
cpus = 8
maxForks = 36
// Retry for exit codes that have something to do with memory issues
errorStrategy = { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries = 3
maxMemory = null
maxMemory = 192.GB
// Resource labels
withLabel: singlecpu { cpus = 1 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 10 }
withLabel: highcpu { cpus = 20 }
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 8 }
withLabel: midcpu { cpus = 16 }
withLabel: highcpu { cpus = 32 }
withLabel: lowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 25.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 50.GB * task.attempt ) } }
withLabel: veryhighmem { memory = { get_memory( 75.GB * task.attempt ) } }
withLabel: verylowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 16.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 64.GB * task.attempt ) } }
}
profiles {
// detect tempdir
tempDir = java.nio.file.Paths.get(
System.getenv('NXF_TEMP') ?:
System.getenv('VIASH_TEMP') ?:
System.getenv('TEMPDIR') ?:
System.getenv('TMPDIR') ?:
'/tmp'
).toAbsolutePath()
mount_temp {
docker.temp = tempDir
podman.temp = tempDir
charliecloud.temp = tempDir
}
no_publish {
process {
withName: '.*' {
publishDir = [
enabled: false
]
}
}
}
docker {
docker.fixOwnership = true
docker.enabled = true
// docker.userEmulation = true
singularity.enabled = false
podman.enabled = false
shifter.enabled = false
charliecloud.enabled = false
}
local {
// This config is for local processing.
process {
maxMemory = 25.GB
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 6 }
withLabel: highcpu { cpus = 12 }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 12.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 20.GB * task.attempt ) } }
}
}
}
def get_memory(to_compare) {
if (!process.containsKey("maxMemory") || !process.maxMemory) {
return to_compare

View File

@@ -170,15 +170,18 @@ build_info:
engine: "native"
output: "target/nextflow/workflows/utils/groupWells"
executable: "target/nextflow/workflows/utils/groupWells/main.nf"
viash_version: "0.9.0-RC7"
git_commit: "cf9797232db1306bfd5696287928cababe317d99"
git_remote: "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
viash_version: "0.9.0"
git_commit: "65dd41d8b1b4a307735c72320c96c0880c75f17f"
git_remote: "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
package_config:
name: "htrnaseq"
version: "main"
description: "High-throughput pipeline [WIP]\n"
info: null
viash_version: "0.9.0-RC7"
info:
test_resources:
- path: "gs://viash-hub-test-data/htrnaseq/v1/"
dest: "resources_test"
viash_version: "0.9.0"
source: "src"
target: "target"
config_mods:

View File

@@ -1,8 +1,8 @@
// groupWells main
//
// This wrapper script is auto-generated by viash 0.9.0-RC7 and is thus a
// derivative work thereof. This software comes with ABSOLUTELY NO WARRANTY from
// Data Intuitive.
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
// Intuitive.
//
// The component may contain files which fall under a different license. The
// authors of this component should specify the license in the header of such
@@ -1728,7 +1728,9 @@ def publishStates(Map args) {
def yamlFilename = yamlTemplate_
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
// TODO: do the pathnames in state_ match up with the outputFilenames_?
@@ -1799,7 +1801,9 @@ def publishStatesByConfig(Map args) {
def yamlTemplate = params.containsKey("output_state") ? params.output_state : '$id.$key.state.yaml'
def yamlFilename = yamlTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
@@ -1841,7 +1845,9 @@ def publishStatesByConfig(Map args) {
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
@@ -3006,15 +3012,23 @@ meta = [
"runner" : "nextflow",
"engine" : "native",
"output" : "target/nextflow/workflows/utils/groupWells",
"viash_version" : "0.9.0-RC7",
"git_commit" : "cf9797232db1306bfd5696287928cababe317d99",
"git_remote" : "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
"viash_version" : "0.9.0",
"git_commit" : "65dd41d8b1b4a307735c72320c96c0880c75f17f",
"git_remote" : "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
},
"package_config" : {
"name" : "htrnaseq",
"version" : "main",
"description" : "High-throughput pipeline [WIP]\n",
"viash_version" : "0.9.0-RC7",
"info" : {
"test_resources" : [
{
"path" : "gs://viash-hub-test-data/htrnaseq/v1/",
"dest" : "resources_test"
}
]
},
"viash_version" : "0.9.0",
"source" : "src",
"target" : "target",
"config_mods" : [

View File

@@ -1,26 +1,88 @@
executor {
$k8s {
submitRateLimit = '10sec'
pollInterval = '1 sec'
}
}
process {
// Default resources for components that hardly do any processing
memory = { 2.GB * task.attempt }
cpus = 1
container = 'nextflow/bash:latest'
// default resources
memory = { 8.Gb * task.attempt }
cpus = 8
maxForks = 36
// Retry for exit codes that have something to do with memory issues
errorStrategy = { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries = 3
maxMemory = null
maxMemory = 192.GB
// Resource labels
withLabel: singlecpu { cpus = 1 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 10 }
withLabel: highcpu { cpus = 20 }
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 8 }
withLabel: midcpu { cpus = 16 }
withLabel: highcpu { cpus = 32 }
withLabel: lowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 25.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 50.GB * task.attempt ) } }
withLabel: veryhighmem { memory = { get_memory( 75.GB * task.attempt ) } }
withLabel: verylowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 16.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 64.GB * task.attempt ) } }
}
profiles {
// detect tempdir
tempDir = java.nio.file.Paths.get(
System.getenv('NXF_TEMP') ?:
System.getenv('VIASH_TEMP') ?:
System.getenv('TEMPDIR') ?:
System.getenv('TMPDIR') ?:
'/tmp'
).toAbsolutePath()
mount_temp {
docker.temp = tempDir
podman.temp = tempDir
charliecloud.temp = tempDir
}
no_publish {
process {
withName: '.*' {
publishDir = [
enabled: false
]
}
}
}
docker {
docker.fixOwnership = true
docker.enabled = true
// docker.userEmulation = true
singularity.enabled = false
podman.enabled = false
shifter.enabled = false
charliecloud.enabled = false
}
local {
// This config is for local processing.
process {
maxMemory = 25.GB
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 6 }
withLabel: highcpu { cpus = 12 }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 12.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 20.GB * task.attempt ) } }
}
}
}
def get_memory(to_compare) {
if (!process.containsKey("maxMemory") || !process.maxMemory) {
return to_compare

View File

@@ -6,23 +6,27 @@ argument_groups:
arguments:
- type: "file"
name: "--input_r1"
description: "R1"
description: "Forward reads in FASTQ format. Multiple files can be provided which\
\ will\nbe demultiplexed separately before joining the results for each individual\
\ well.\n"
info: null
must_exist: true
create_parent: true
required: true
direction: "input"
multiple: false
multiple: true
multiple_sep: ";"
- type: "file"
name: "--input_r2"
description: "R2"
description: "Reverse reads in FASTQ format. Multiple files can be provided which\
\ will\nbe demultiplexed separately before joining the results for each individual\
\ well.\n"
info: null
must_exist: true
create_parent: true
required: true
direction: "input"
multiple: false
multiple: true
multiple_sep: ";"
- type: "file"
name: "--barcodesFasta"
@@ -112,11 +116,20 @@ dependencies:
repository:
type: "vsh"
repo: "biobox"
tag: "main"
- name: "concat_text"
repository:
type: "vsh"
repo: "craftbox"
tag: "v0.1.0"
repositories:
- type: "vsh"
name: "bb"
repo: "biobox"
tag: "main"
- type: "vsh"
name: "cb"
repo: "craftbox"
tag: "v0.1.0"
license: "MIT"
links:
@@ -196,17 +209,21 @@ build_info:
engine: "native|native"
output: "target/nextflow/workflows/well_demultiplex"
executable: "target/nextflow/workflows/well_demultiplex/main.nf"
viash_version: "0.9.0-RC7"
git_commit: "cf9797232db1306bfd5696287928cababe317d99"
git_remote: "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
viash_version: "0.9.0"
git_commit: "65dd41d8b1b4a307735c72320c96c0880c75f17f"
git_remote: "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
dependencies:
- "target/dependencies/vsh/vsh/biobox/v0.1.0/nextflow/cutadapt"
- "target/dependencies/vsh/vsh/biobox/main/nextflow/cutadapt"
- "target/dependencies/vsh/vsh/craftbox/v0.1.0/nextflow/concat_text"
package_config:
name: "htrnaseq"
version: "main"
description: "High-throughput pipeline [WIP]\n"
info: null
viash_version: "0.9.0-RC7"
info:
test_resources:
- path: "gs://viash-hub-test-data/htrnaseq/v1/"
dest: "resources_test"
viash_version: "0.9.0"
source: "src"
target: "target"
config_mods:

View File

@@ -1,8 +1,8 @@
// well_demultiplex main
//
// This wrapper script is auto-generated by viash 0.9.0-RC7 and is thus a
// derivative work thereof. This software comes with ABSOLUTELY NO WARRANTY from
// Data Intuitive.
// This wrapper script is auto-generated by viash 0.9.0 and is thus a derivative
// work thereof. This software comes with ABSOLUTELY NO WARRANTY from Data
// Intuitive.
//
// The component may contain files which fall under a different license. The
// authors of this component should specify the license in the header of such
@@ -1728,7 +1728,9 @@ def publishStates(Map args) {
def yamlFilename = yamlTemplate_
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
// TODO: do the pathnames in state_ match up with the outputFilenames_?
@@ -1799,7 +1801,9 @@ def publishStatesByConfig(Map args) {
def yamlTemplate = params.containsKey("output_state") ? params.output_state : '$id.$key.state.yaml'
def yamlFilename = yamlTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
def yamlDir = java.nio.file.Paths.get(yamlFilename).getParent()
// the processed state is a list of [key, value, inputPath, outputFilename] tuples, where
@@ -1841,7 +1845,9 @@ def publishStatesByConfig(Map args) {
// instantiate the template
def filename = filenameTemplate
.replaceAll('\\$id', id_)
.replaceAll('\\$\\{id\\}', id_)
.replaceAll('\\$key', key_)
.replaceAll('\\$\\{key\\}', key_)
if (par.multiple) {
// if the parameter is multiple: true, the filename
// should contain a wildcard '*' that is replaced with
@@ -2808,23 +2814,23 @@ meta = [
{
"type" : "file",
"name" : "--input_r1",
"description" : "R1",
"description" : "Forward reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple" : true,
"multiple_sep" : ";"
},
{
"type" : "file",
"name" : "--input_r2",
"description" : "R2",
"description" : "Reverse reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n",
"must_exist" : true,
"create_parent" : true,
"required" : true,
"direction" : "input",
"multiple" : false,
"multiple" : true,
"multiple_sep" : ";"
},
{
@@ -2940,6 +2946,14 @@ meta = [
"repository" : {
"type" : "vsh",
"repo" : "biobox",
"tag" : "main"
}
},
{
"name" : "concat_text",
"repository" : {
"type" : "vsh",
"repo" : "craftbox",
"tag" : "v0.1.0"
}
}
@@ -2949,6 +2963,12 @@ meta = [
"type" : "vsh",
"name" : "bb",
"repo" : "biobox",
"tag" : "main"
},
{
"type" : "vsh",
"name" : "cb",
"repo" : "craftbox",
"tag" : "v0.1.0"
}
],
@@ -3043,15 +3063,23 @@ meta = [
"runner" : "nextflow",
"engine" : "native|native",
"output" : "target/nextflow/workflows/well_demultiplex",
"viash_version" : "0.9.0-RC7",
"git_commit" : "cf9797232db1306bfd5696287928cababe317d99",
"git_remote" : "https://x-access-token:ghs_KjB7pWu8DQM3iFulLu7RI06qnt5K8S1A0eaE@github.com/viash-hub/htrnaseq"
"viash_version" : "0.9.0",
"git_commit" : "65dd41d8b1b4a307735c72320c96c0880c75f17f",
"git_remote" : "https://x-access-token:ghs_McZDF0yobnnHmOEb2Q4JaaB3pzr9mz1VbIOs@github.com/viash-hub/htrnaseq"
},
"package_config" : {
"name" : "htrnaseq",
"version" : "main",
"description" : "High-throughput pipeline [WIP]\n",
"viash_version" : "0.9.0-RC7",
"info" : {
"test_resources" : [
{
"path" : "gs://viash-hub-test-data/htrnaseq/v1/",
"dest" : "resources_test"
}
]
},
"viash_version" : "0.9.0",
"source" : "src",
"target" : "target",
"config_mods" : [
@@ -3080,7 +3108,8 @@ meta = [
// resolve dependencies dependencies (if any)
meta["root_dir"] = getRootDir()
include { cutadapt } from "${meta.root_dir}/dependencies/vsh/vsh/biobox/v0.1.0/nextflow/cutadapt/main.nf"
include { cutadapt } from "${meta.root_dir}/dependencies/vsh/vsh/biobox/main/nextflow/cutadapt/main.nf"
include { concat_text } from "${meta.root_dir}/dependencies/vsh/vsh/craftbox/v0.1.0/nextflow/concat_text/main.nf"
// inner workflow
// user-provided Nextflow code
@@ -3090,61 +3119,134 @@ workflow run_wf {
main:
output_ch = input_ch
| flatMap {id, state ->
assert state.input_r1.size() == state.input_r2.size(), "Expected equal number of inputs for R1 and R2"
def n_lanes = state.input_r1.size()
[state.input_r1, state.input_r2].transpose().withIndex().collect{ input_pair, index ->
def single_input_r1 = input_pair[0]
def single_input_r2 = input_pair[1]
def newState = state + ["input_r1": single_input_r1,
"input_r2": single_input_r2,
"pool": id,
"lane_sorting": index,
"n_lanes": n_lanes]
def newId = id + "_" + index
[newId, newState]
}
}
| cutadapt.run(
// TODO: Remove hard-coded directives and replace with profiles
directives: [
cpus: 4
],
directives: [label: ["highmem", "midcpu"]],
fromState: { id, state ->
def new_output = ("fastq_${state.lane_sorting}/*_001.fastq")
[
input: state.input_r1,
input_r2: state.input_r2,
no_indels: true,
action: "none",
front_fasta: state.barcodesFasta,
output: "fastq/*_001.fastq"
output: new_output,
error_rate: 0.10,
demultiplex_mode: "single",
]
},
toState: { id, result, state ->
[
def newState = [
pool: state.pool,
n_lanes: state.n_lanes,
output: result.output,
lane_sorting: state.lane_sorting,
]
return newState
}
)
// Parse the file names to obtain metadata about the output
| flatMap{ id, state ->
def pool = state.pool
state.output.collect{ p ->
def barcode = (p =~ /.*\\/([ACTG]*|unknown)_R?.*/)[0][1]
def pair_end = (p =~ /.*_(R[12])_.*/)[0][1]
def lane = (p =~ /.*_(L\d+).*/) ? (p =~ /.*_(L\d+).*/)[0][1] : "NA"
def new_id = id + "__" + barcode
def new_id = pool + "__" + barcode
def group_key = groupKey(new_id, state.n_lanes * 2)
[
new_id,
group_key,
[
pool: id,
pool: pool,
barcode: barcode,
output: p,
lane: lane,
pair_end: pair_end,
_meta: [ join_id: id ]
lane_sorting: state.lane_sorting,
_meta: [ join_id: pool ]
]
]
}
}
// Group the outputs from across lanes
| groupTuple(by: 0, sort: "hash")
| map {id, states ->
| groupTuple(sort: "hash")
| map {_, states ->
def r1_output = states.findAll{ it.pair_end == "R1" }.collect{it.output}
def r2_output = states.findAll{ it.pair_end == "R2" }.collect{it.output}
def lane_sorting_r1 = states.findAll{ it.pair_end == "R1" }.collect{it.lane_sorting}
def lane_sorting_r2 = states.findAll{ it.pair_end == "R2" }.collect{it.lane_sorting}
// At this point, the lane_sorting hold the positios the items in r1_output and r2_output
// should become in a new list.
def r1_output_sorted = new ArrayList(r1_output.size())
def r2_output_sorted = new ArrayList(r2_output.size())
lane_sorting_r1.eachWithIndex { pos, index ->
r1_output_sorted[pos] = r1_output[index]
}
lane_sorting_r2.eachWithIndex { pos, index ->
r2_output_sorted[pos] = r2_output[index]
}
assert r1_output.size() == r2_output.size()
// Here we pick the state from the first item in the list of states
// and overwrite the keys which are different across states
// TODO: we can assert that these keys are the same
def first_state = states[0]
def new_id = first_state.pool + "__" + first_state.barcode
def new_state = first_state + ["output_r1": r1_output, "output_r2": r2_output]
def new_state = first_state + ["output_r1": r1_output_sorted, "output_r2": r2_output_sorted]
[new_id, new_state]
}
// TODO: Expand this into matching a whitelist/blacklist of barcodes
// ... and turn into separate component
| filter{ id, state -> state.barcode != "unknown" }
| concat_text.run(
directives: [label: ["lowmem", "lowcpu"]],
key: "concat_txt_r1",
runIf: {id, state -> state.output_r1.size() > 1},
fromState: { id, state ->
[
input: state.output_r1,
gzip_output: false,
output: "${id}_R1.fastq"
]
},
toState: { id, result, state ->
def newState = state + [ output_r1: [ result.output ] ]
return newState
}
)
| concat_text.run(
directives: [label: ["lowmem", "lowcpu"]],
key: "concat_text_r2",
runIf: {id, state -> state.output_r2.size() > 1},
fromState: { id, state ->
[
input: state.output_r2,
gzip_output: false,
output: "${id}_R2.fastq",
]
},
toState: { id, result, state ->
def newState = state + [ output_r2: [ result.output ] ]
return newState
}
)
| setState(["pool", "barcode", "lane", "_meta", "output_r1", "output_r2"])
emit:

View File

@@ -1,26 +1,88 @@
executor {
$k8s {
submitRateLimit = '10sec'
pollInterval = '1 sec'
}
}
process {
// Default resources for components that hardly do any processing
memory = { 2.GB * task.attempt }
cpus = 1
container = 'nextflow/bash:latest'
// default resources
memory = { 8.Gb * task.attempt }
cpus = 8
maxForks = 36
// Retry for exit codes that have something to do with memory issues
errorStrategy = { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries = 3
maxMemory = null
maxMemory = 192.GB
// Resource labels
withLabel: singlecpu { cpus = 1 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 10 }
withLabel: highcpu { cpus = 20 }
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 8 }
withLabel: midcpu { cpus = 16 }
withLabel: highcpu { cpus = 32 }
withLabel: lowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 25.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 50.GB * task.attempt ) } }
withLabel: veryhighmem { memory = { get_memory( 75.GB * task.attempt ) } }
withLabel: verylowmem { memory = { get_memory( 4.GB * task.attempt ) } }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 16.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 64.GB * task.attempt ) } }
}
profiles {
// detect tempdir
tempDir = java.nio.file.Paths.get(
System.getenv('NXF_TEMP') ?:
System.getenv('VIASH_TEMP') ?:
System.getenv('TEMPDIR') ?:
System.getenv('TMPDIR') ?:
'/tmp'
).toAbsolutePath()
mount_temp {
docker.temp = tempDir
podman.temp = tempDir
charliecloud.temp = tempDir
}
no_publish {
process {
withName: '.*' {
publishDir = [
enabled: false
]
}
}
}
docker {
docker.fixOwnership = true
docker.enabled = true
// docker.userEmulation = true
singularity.enabled = false
podman.enabled = false
shifter.enabled = false
charliecloud.enabled = false
}
local {
// This config is for local processing.
process {
maxMemory = 25.GB
withLabel: verylowcpu { cpus = 2 }
withLabel: lowcpu { cpus = 4 }
withLabel: midcpu { cpus = 6 }
withLabel: highcpu { cpus = 12 }
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
withLabel: midmem { memory = { get_memory( 12.GB * task.attempt ) } }
withLabel: highmem { memory = { get_memory( 20.GB * task.attempt ) } }
}
}
}
def get_memory(to_compare) {
if (!process.containsKey("maxMemory") || !process.maxMemory) {
return to_compare

View File

@@ -17,8 +17,8 @@
"input_r1": {
"type":
"string",
"description": "Type: `file`, required. R1",
"help_text": "Type: `file`, required. R1"
"description": "Type: List of `file`, required, multiple_sep: `\";\"`. Forward reads in FASTQ format",
"help_text": "Type: List of `file`, required, multiple_sep: `\";\"`. Forward reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n"
}
@@ -27,8 +27,8 @@
"input_r2": {
"type":
"string",
"description": "Type: `file`, required. R2",
"help_text": "Type: `file`, required. R2"
"description": "Type: List of `file`, required, multiple_sep: `\";\"`. Reverse reads in FASTQ format",
"help_text": "Type: List of `file`, required, multiple_sep: `\";\"`. Reverse reads in FASTQ format. Multiple files can be provided which will\nbe demultiplexed separately before joining the results for each individual well.\n"
}