Build branch demultiplex/v0.6 with version v0.6.0 to demultiplex on branch v0.6 (b9fa548)
Build pipeline: viash-hub.demultiplex.v0.6-6mvvl
Source commit: b9fa5488ac
Source message: Bump version to v0.6.0
This commit is contained in:
98
src/config/labels.config
Normal file
98
src/config/labels.config
Normal file
@@ -0,0 +1,98 @@
|
||||
process {
|
||||
container = 'nextflow/bash:latest'
|
||||
|
||||
// default resources
|
||||
memory = { 8.Gb * task.attempt }
|
||||
cpus = 8
|
||||
maxForks = 36
|
||||
|
||||
// Retry for exit codes that have something to do with memory issues
|
||||
errorStrategy = { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
|
||||
maxRetries = 3
|
||||
maxMemory = 192.GB
|
||||
|
||||
// Resource labels
|
||||
withLabel: verylowcpu { cpus = 2 }
|
||||
withLabel: lowcpu { cpus = 8 }
|
||||
withLabel: midcpu { cpus = 16 }
|
||||
withLabel: highcpu { cpus = 32 }
|
||||
|
||||
withLabel: verylowmem { memory = { get_memory( 4.GB * task.attempt ) } }
|
||||
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
|
||||
withLabel: midmem { memory = { get_memory( 16.GB * task.attempt ) } }
|
||||
withLabel: highmem { memory = { get_memory( 64.GB * task.attempt ) } }
|
||||
|
||||
}
|
||||
|
||||
profiles {
|
||||
// detect tempdir
|
||||
tempDir = java.nio.file.Paths.get(
|
||||
System.getenv('NXF_TEMP') ?:
|
||||
System.getenv('VIASH_TEMP') ?:
|
||||
System.getenv('TEMPDIR') ?:
|
||||
System.getenv('TMPDIR') ?:
|
||||
'/tmp'
|
||||
).toAbsolutePath()
|
||||
|
||||
mount_temp {
|
||||
docker.temp = tempDir
|
||||
podman.temp = tempDir
|
||||
charliecloud.temp = tempDir
|
||||
}
|
||||
|
||||
no_publish {
|
||||
process {
|
||||
withName: '.*' {
|
||||
publishDir = [
|
||||
enabled: false
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
docker {
|
||||
docker.fixOwnership = true
|
||||
docker.enabled = true
|
||||
// docker.userEmulation = true
|
||||
singularity.enabled = false
|
||||
podman.enabled = false
|
||||
shifter.enabled = false
|
||||
charliecloud.enabled = false
|
||||
}
|
||||
|
||||
local {
|
||||
// This config is for local processing.
|
||||
process {
|
||||
maxMemory = 25.GB
|
||||
withLabel: verylowcpu { cpus = 2 }
|
||||
withLabel: lowcpu { cpus = 4 }
|
||||
withLabel: midcpu { cpus = 6 }
|
||||
withLabel: highcpu { cpus = 12 }
|
||||
|
||||
withLabel: lowmem { memory = { get_memory( 8.GB * task.attempt ) } }
|
||||
withLabel: midmem { memory = { get_memory( 12.GB * task.attempt ) } }
|
||||
withLabel: highmem { memory = { get_memory( 20.GB * task.attempt ) } }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def get_memory(to_compare) {
|
||||
if (!process.containsKey("maxMemory") || !process.maxMemory) {
|
||||
return to_compare
|
||||
}
|
||||
|
||||
try {
|
||||
if (process.containsKey("maxRetries") && process.maxRetries && task.attempt == (process.maxRetries as int)) {
|
||||
return process.maxMemory
|
||||
}
|
||||
else if (to_compare.compareTo(process.maxMemory as nextflow.util.MemoryUnit) == 1) {
|
||||
return max_memory as nextflow.util.MemoryUnit
|
||||
}
|
||||
else {
|
||||
return to_compare
|
||||
}
|
||||
} catch (all) {
|
||||
println "Error processing memory resources. Please check that process.maxMemory '${process.maxMemory}' and process.maxRetries '${process.maxRetries}' are valid!"
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
48
src/dataflow/combine_samples/config.vsh.yaml
Normal file
48
src/dataflow/combine_samples/config.vsh.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
name: combine_samples
|
||||
namespace: dataflow
|
||||
description: Combine fastq files from across samples into one event with a list of fastq files per orientation.
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: "--id"
|
||||
description: "ID of the new event"
|
||||
type: string
|
||||
required: true
|
||||
- name: --forward_input
|
||||
type: file
|
||||
required: true
|
||||
multiple: true
|
||||
- name: --reverse_input
|
||||
type: file
|
||||
required: false
|
||||
multiple: true
|
||||
- name: "--sample_qc_dir"
|
||||
type: file
|
||||
required: true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --output_forward
|
||||
type: file
|
||||
direction: output
|
||||
multiple: true
|
||||
required: true
|
||||
- name: --output_reverse
|
||||
type: file
|
||||
direction: output
|
||||
multiple: true
|
||||
required: false
|
||||
- name: "--output_sample_qc"
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
multiple: true
|
||||
resources:
|
||||
- type: nextflow_script
|
||||
path: main.nf
|
||||
entrypoint: run_wf
|
||||
|
||||
runners:
|
||||
- type: nextflow
|
||||
|
||||
engines:
|
||||
- type: native
|
||||
30
src/dataflow/combine_samples/main.nf
Normal file
30
src/dataflow/combine_samples/main.nf
Normal file
@@ -0,0 +1,30 @@
|
||||
workflow run_wf {
|
||||
take:
|
||||
input_ch
|
||||
|
||||
main:
|
||||
output_ch = input_ch
|
||||
| map { id, state ->
|
||||
def newEvent = [state.id, state + ["_meta": ["join_id": id]]]
|
||||
newEvent
|
||||
}
|
||||
| groupTuple(by: 0, sort: "hash")
|
||||
| map {run_id, states ->
|
||||
// Gather the following state for all samples
|
||||
def forward_fastqs = states.collect{it.forward_input}.flatten()
|
||||
def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null}.flatten()
|
||||
def sample_qc_dirs = states.collect{it.sample_qc_dir}
|
||||
|
||||
def resultState = [
|
||||
"output_forward": forward_fastqs,
|
||||
"output_reverse": reverse_fastqs,
|
||||
"output_sample_qc": sample_qc_dirs,
|
||||
// The join ID is the same across all samples from the same run
|
||||
"_meta": ["join_id": states[0]._meta.join_id]
|
||||
]
|
||||
return [run_id, resultState]
|
||||
}
|
||||
|
||||
emit:
|
||||
output_ch
|
||||
}
|
||||
49
src/dataflow/gather_fastqs_and_validate/config.vsh.yaml
Normal file
49
src/dataflow/gather_fastqs_and_validate/config.vsh.yaml
Normal file
@@ -0,0 +1,49 @@
|
||||
name: gather_fastqs_and_validate
|
||||
namespace: dataflow
|
||||
description: |
|
||||
From a directory containing fastq files, gather the files per sample
|
||||
and validate according to the contents of the sample sheet.
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --input
|
||||
description: Directory containing .fastq files
|
||||
type: file
|
||||
required: true
|
||||
- name: --sample_sheet
|
||||
description: Sample sheet
|
||||
type: file
|
||||
required: true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --fastq_forward
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
multiple: true
|
||||
- name: "--fastq_reverse"
|
||||
type: file
|
||||
direction: output
|
||||
required: false
|
||||
multiple: true
|
||||
resources:
|
||||
- type: nextflow_script
|
||||
path: main.nf
|
||||
entrypoint: run_wf
|
||||
test_resources:
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_gather_and_validate
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_undetermined_empty
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_without_index
|
||||
- path: test_data
|
||||
|
||||
runners:
|
||||
- type: nextflow
|
||||
|
||||
engines:
|
||||
- type: native
|
||||
33
src/dataflow/gather_fastqs_and_validate/integration_tests.sh
Executable file
33
src/dataflow/gather_fastqs_and_validate/integration_tests.sh
Executable file
@@ -0,0 +1,33 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
# get the root of the directory
|
||||
REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||
|
||||
# ensure that the command below is run from the root of the repository
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
viash ns build --setup cb -q gather_fastqs_and_validate
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/dataflow/gather_fastqs_and_validate/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_gather_and_validate \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/dataflow/gather_fastqs_and_validate/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_undetermined_empty \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/dataflow/gather_fastqs_and_validate/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_without_index \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
|
||||
122
src/dataflow/gather_fastqs_and_validate/main.nf
Normal file
122
src/dataflow/gather_fastqs_and_validate/main.nf
Normal file
@@ -0,0 +1,122 @@
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.nio.file.Files
|
||||
import java.io.BufferedInputStream
|
||||
|
||||
def is_empty(file_to_check){
|
||||
/*
|
||||
Checks if a file has content
|
||||
*/
|
||||
if (file_to_check.size() == 0) {
|
||||
return true
|
||||
}
|
||||
def input_stream = Files.newInputStream(file_to_check)
|
||||
def gzInputStream
|
||||
try {
|
||||
gzInputStream = new GZIPInputStream(new BufferedInputStream(input_stream))
|
||||
} catch (java.io.EOFException ex) {
|
||||
// This is not a gzipfile...
|
||||
return false
|
||||
}
|
||||
def read_one_byte = gzInputStream.read()
|
||||
return read_one_byte == -1
|
||||
}
|
||||
|
||||
workflow run_wf {
|
||||
take:
|
||||
input_ch
|
||||
|
||||
main:
|
||||
output_ch = input_ch
|
||||
// Gather input files from BCL convert output folder
|
||||
| flatMap { id, state ->
|
||||
println "Processing sample sheet: $state.sample_sheet"
|
||||
def sample_sheet = state.sample_sheet
|
||||
def start_parsing = false
|
||||
def sample_id_column_index = null
|
||||
def undetermined_sample_name = "Undetermined"
|
||||
def samples = [undetermined_sample_name]
|
||||
def original_id = id
|
||||
|
||||
// Parse sample sheet for sample IDs
|
||||
println "Processing run information file ${sample_sheet}"
|
||||
csv_lines = sample_sheet.splitCsv(header: false, sep: ',')
|
||||
csv_lines.any { csv_items ->
|
||||
if (csv_items.isEmpty() || csv_items[0].startsWith("#")) {
|
||||
// skip empty or commented line
|
||||
return
|
||||
}
|
||||
def possible_header = csv_items[0]
|
||||
def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name}
|
||||
if (header) {
|
||||
if (start_parsing) {
|
||||
// Stop parsing when encountering the next header
|
||||
println "Encountered next header '[${start_parsing}]', stopping parsing."
|
||||
return true
|
||||
}
|
||||
// [Data], [BCLConvert_Data] for illumina
|
||||
// [Samples] or sometimes [SAMPLES] for Element Biosciences
|
||||
if (header.toLowerCase() in ["data", "samples", "bclconvert_data"]) {
|
||||
println "Found header [${header}], start parsing."
|
||||
start_parsing = true
|
||||
return
|
||||
}
|
||||
}
|
||||
if (start_parsing) {
|
||||
if ( sample_id_column_index == null) {
|
||||
println "Looking for sample name column."
|
||||
sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID" || it == "SampleName"}
|
||||
assert (!sample_id_column_index.isEmpty()):
|
||||
"Could not find column 'Sample_ID' (Illumina) or 'SampleName' " +
|
||||
"(Element Biosciences) in run information! Found: ${sample_id_column_index}"
|
||||
assert sample_id_column_index.size() == 1, "Expected run information file to contain " +
|
||||
"a column 'Sample_ID' or 'SampleName', not both. Found: ${sample_id_column_index}"
|
||||
sample_id_column_index = sample_id_column_index[0]
|
||||
println "Found sample names column '${csv_items[sample_id_column_index]}'."
|
||||
return
|
||||
}
|
||||
def candidate_sample_id = csv_items[sample_id_column_index]
|
||||
if (candidate_sample_id?.trim()) { // Don't add empty csv entries.
|
||||
samples += csv_items[sample_id_column_index]
|
||||
}
|
||||
}
|
||||
// This return is important! (If 'true' is returned, the parsing stops.)
|
||||
return
|
||||
}
|
||||
assert start_parsing:
|
||||
"Sample information file does not contain [Data], [Samples] or [BCLConvert_Data] header!"
|
||||
assert samples.size() > 1:
|
||||
"Sample information file does not seem to contain any information about the samples!"
|
||||
println "Finished processing run information file, found samples: ${samples}."
|
||||
println "Looking for fastq files in ${state.input}."
|
||||
def allfastqs = state.input.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/}
|
||||
println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}."
|
||||
processed_samples = samples.collect { sample_id ->
|
||||
def forward_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R1_(\d+)\.fastq\.gz$/
|
||||
def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/
|
||||
// Sort is needed here because multiple lanes (_L00*_) might be present and they need to be in the same order in both lists
|
||||
def forward_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ forward_regex}.sort()
|
||||
def reverse_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex}.sort()
|
||||
assert forward_fastq && !forward_fastq.isEmpty(): "No forward fastq files were found for sample ${sample_id}. " +
|
||||
"All fastq files in directory: ${allfastqs.collect{it.name}}"
|
||||
assert (reverse_fastq.isEmpty() || (forward_fastq.size() == reverse_fastq.size())):
|
||||
"Expected equal number of forward and reverse fastq files for sample ${sample_id}. " +
|
||||
"Found forward: ${forward_fastq} and reverse: ${reverse_fastq}."
|
||||
println "Found ${forward_fastq.size()} forward and ${reverse_fastq.size()} reverse " +
|
||||
"fastq files for sample ${sample_id}"
|
||||
|
||||
assert sample_id == undetermined_sample_name || (forward_fastq.every{!is_empty(it)} && reverse_fastq.every{!is_empty(it)}):
|
||||
"A fastq file for sample '${sample_id}' appears to be empty!"
|
||||
def fastqs_state = [
|
||||
"fastq_forward": forward_fastq,
|
||||
"fastq_reverse": reverse_fastq,
|
||||
"_meta": [ "join_id": original_id ],
|
||||
]
|
||||
[sample_id, fastqs_state]
|
||||
}
|
||||
println "Finished processing sample sheet."
|
||||
return processed_samples
|
||||
}
|
||||
|
||||
emit:
|
||||
output_ch
|
||||
}
|
||||
10
src/dataflow/gather_fastqs_and_validate/nextflow.config
Normal file
10
src/dataflow/gather_fastqs_and_validate/nextflow.config
Normal file
@@ -0,0 +1,10 @@
|
||||
manifest {
|
||||
nextflowVersion = '!>=20.12.1-edge'
|
||||
}
|
||||
|
||||
params {
|
||||
rootDir = java.nio.file.Paths.get("$projectDir/../../../").toAbsolutePath().normalize().toString()
|
||||
}
|
||||
|
||||
// include common settings
|
||||
includeConfig("${params.rootDir}/src/config/labels.config")
|
||||
86
src/dataflow/gather_fastqs_and_validate/test.nf
Normal file
86
src/dataflow/gather_fastqs_and_validate/test.nf
Normal file
@@ -0,0 +1,86 @@
|
||||
nextflow.enable.dsl=2
|
||||
|
||||
include { gather_fastqs_and_validate } from params.rootDir + "/target/nextflow/dataflow/gather_fastqs_and_validate/main.nf"
|
||||
|
||||
|
||||
workflow test_gather_and_validate {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
id: "run1",
|
||||
input: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/fastqs",
|
||||
sample_sheet: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/samplesheet.csv",
|
||||
]
|
||||
])
|
||||
| map { state -> [state.id, state]}
|
||||
| gather_fastqs_and_validate.run(toState: ["fastq_forward", "fastq_reverse"])
|
||||
|
||||
output_ch
|
||||
| toSortedList{a, b -> a[0] <=> b[0]}
|
||||
| view {"Output: $it"}
|
||||
| map {
|
||||
assert it.size() == 3: "Expected three fastq pairs"
|
||||
def first_pair = it[0][1]
|
||||
assert first_pair.fastq_forward.collect{it.name} == ["Undetermined_S1_R1_001.fastq.gz"]
|
||||
assert first_pair.fastq_reverse.collect{it.name} == ["Undetermined_S1_R2_001.fastq.gz"]
|
||||
def second_pair = it[1][1]
|
||||
assert second_pair.fastq_forward.collect{it.name} == ["sample1_S1_L001_R1_001.fastq.gz", "sample1_S1_L002_R1_001.fastq.gz"]
|
||||
assert second_pair.fastq_reverse.collect{it.name} == ["sample1_S1_L001_R2_001.fastq.gz", "sample1_S1_L002_R2_001.fastq.gz"]
|
||||
def undetermined_pair = it[2][1]
|
||||
assert undetermined_pair.fastq_forward.collect{it.name} == ["sample2_S1_L001_R1_001.fastq.gz"]
|
||||
assert undetermined_pair.fastq_reverse.collect{it.name} == ["sample2_S1_L001_R2_001.fastq.gz"]
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
workflow test_undetermined_empty {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
id: "run1",
|
||||
input: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/fastqs_undetermined_empty",
|
||||
sample_sheet: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/samplesheet.csv",
|
||||
]
|
||||
])
|
||||
| map { state -> [state.id, state]}
|
||||
| gather_fastqs_and_validate.run(toState: ["fastq_forward", "fastq_reverse"])
|
||||
|
||||
output_ch
|
||||
| toSortedList{a, b -> a[0] <=> b[0]}
|
||||
| view {"Output: $it"}
|
||||
| map {
|
||||
assert it.size() == 3: "Expected three fastq pairs"
|
||||
def first_pair = it[0][1]
|
||||
assert first_pair.fastq_forward.collect{it.name} == ["Undetermined_S1_R1_001.fastq.gz"]
|
||||
assert first_pair.fastq_reverse.collect{it.name} == ["Undetermined_S1_R2_001.fastq.gz"]
|
||||
def second_pair = it[1][1]
|
||||
assert second_pair.fastq_forward.collect{it.name} == ["sample1_S1_L001_R1_001.fastq.gz", "sample1_S1_L002_R1_001.fastq.gz"]
|
||||
assert second_pair.fastq_reverse.collect{it.name} == ["sample1_S1_L001_R2_001.fastq.gz", "sample1_S1_L002_R2_001.fastq.gz"]
|
||||
def undetermined_pair = it[2][1]
|
||||
assert undetermined_pair.fastq_forward.collect{it.name} == ["sample2_S1_L001_R1_001.fastq.gz"]
|
||||
assert undetermined_pair.fastq_reverse.collect{it.name} == ["sample2_S1_L001_R2_001.fastq.gz"]
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
workflow test_without_index {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
id: "run1",
|
||||
input: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/fastqs_undetermined_empty",
|
||||
sample_sheet: params.rootDir + "/src/dataflow/gather_fastqs_and_validate/test_data/samplesheet_no_index.csv",
|
||||
]
|
||||
])
|
||||
| map { state -> [state.id, state]}
|
||||
| gather_fastqs_and_validate.run(toState: ["fastq_forward", "fastq_reverse"])
|
||||
|
||||
output_ch
|
||||
| toSortedList{a, b -> a[0] <=> b[0]}
|
||||
| view {"Output: $it"}
|
||||
| map {
|
||||
assert it.size() == 2: "Expected two fastq pairs"
|
||||
def first_pair = it[0][1]
|
||||
assert first_pair.fastq_forward.collect{it.name} == ["Undetermined_S1_R1_001.fastq.gz"]
|
||||
assert first_pair.fastq_reverse.collect{it.name} == ["Undetermined_S1_R2_001.fastq.gz"]
|
||||
|
||||
}
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,11 @@
|
||||
[foo],,,,
|
||||
[somecontent],,,,
|
||||
bar,lorem,,,
|
||||
,,,,
|
||||
# Comment
|
||||
|
||||
[BCLConvert_Data],,,,
|
||||
Sample_ID,Index,Index2,,
|
||||
sample1,GTAGCCCTGT,GAGCATCTAT,,
|
||||
sample2,TCGGCTCTAC,CCGATGGTCT,,
|
||||
,,,,
|
||||
|
@@ -0,0 +1,10 @@
|
||||
[foo],,,,
|
||||
[somecontent],,,,
|
||||
bar,lorem,,,
|
||||
,,,,
|
||||
# Comment
|
||||
|
||||
[BCLConvert_Data],,,,
|
||||
Sample_ID,Index,Index2,,
|
||||
sample1,,,,
|
||||
,,,,
|
||||
|
115
src/demultiplex/config.vsh.yaml
Normal file
115
src/demultiplex/config.vsh.yaml
Normal file
@@ -0,0 +1,115 @@
|
||||
name: demultiplex
|
||||
description: Demultiplexing of raw sequencing data
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --id
|
||||
description: Unique identifier for the run
|
||||
type: string
|
||||
- name: --input
|
||||
description: Directory containing raw sequencing data
|
||||
type: file
|
||||
required: true
|
||||
- name: --run_information
|
||||
description: |
|
||||
CSV file containing sample information, which will be used as
|
||||
input for the demultiplexer. Canonically called 'SampleSheet.csv' (Illumina)
|
||||
or 'RunManifest.csv' (Element Biosciences). If not specified,
|
||||
will try to autodetect the sample sheet in the input directory.
|
||||
Requires --demultiplexer to be set.
|
||||
type: file
|
||||
required: false
|
||||
- name: "--demultiplexer"
|
||||
type: string
|
||||
required: false
|
||||
choices: ["bases2fastq", "bclconvert"]
|
||||
description: |
|
||||
Demultiplexer to use, choice depends on the provider
|
||||
of the instrument that was used to generate the data.
|
||||
When not using --sample_sheet, specifying this argument is not
|
||||
required.
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --output
|
||||
description: Directory to write fastq data to
|
||||
type: file
|
||||
direction: output
|
||||
required: false
|
||||
default: "$id/fastq"
|
||||
- name: "--output_sample_qc"
|
||||
description: Directory to write FastQC output to
|
||||
type: file
|
||||
direction: output
|
||||
required: false
|
||||
multiple: true
|
||||
default: "$id/qc/fastqc"
|
||||
- name: "--multiqc_output"
|
||||
description: Location where to write MultiQC output to
|
||||
type: file
|
||||
direction: output
|
||||
required: false
|
||||
default: "$id/qc/multiqc_report.html"
|
||||
- name: "--output_run_information"
|
||||
type: file
|
||||
direction: "output"
|
||||
required: true
|
||||
default: "$id/run_information.csv"
|
||||
- name: "--demultiplexer_logs"
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
default: "$id/demultiplexer_logs"
|
||||
- name: "Other arguments"
|
||||
arguments:
|
||||
- name: --skip_copycomplete_check
|
||||
type: boolean_true
|
||||
description: |
|
||||
Disable the check for the presence of a "CopyComplete.txt" file in input
|
||||
directory in case of Illumina data.
|
||||
|
||||
resources:
|
||||
- type: nextflow_script
|
||||
path: main.nf
|
||||
entrypoint: run_wf
|
||||
|
||||
test_resources:
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_illumina
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_bases2fastq
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test_no_index
|
||||
|
||||
dependencies:
|
||||
- name: io/untar
|
||||
repository: local
|
||||
- name: dataflow/gather_fastqs_and_validate
|
||||
repository: local
|
||||
- name: io/interop_summary_to_csv
|
||||
repository: local
|
||||
- name: dataflow/combine_samples
|
||||
repository: local
|
||||
- name: bcl_convert
|
||||
repository: bb
|
||||
- name: bases2fastq
|
||||
repository: bb
|
||||
- name: fastqc
|
||||
repository: bb
|
||||
- name: multiqc
|
||||
repository: bb
|
||||
- name: detect_demultiplexer
|
||||
repository: local
|
||||
repositories:
|
||||
- name: bb
|
||||
type: vsh
|
||||
repo: biobox
|
||||
tag: v0.4.0
|
||||
|
||||
runners:
|
||||
- type: nextflow
|
||||
|
||||
engines:
|
||||
- type: native
|
||||
31
src/demultiplex/integration_tests.sh
Executable file
31
src/demultiplex/integration_tests.sh
Executable file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# get the root of the directory
|
||||
REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||
|
||||
# ensure that the command below is run from the root of the repository
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
viash ns build --setup cb -q demultiplex
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/demultiplex/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_illumina \
|
||||
-c src/config/labels.config \
|
||||
--resources_test https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/ \
|
||||
-resume
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/demultiplex/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_bases2fastq \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/demultiplex/test.nf \
|
||||
-profile docker,no_publish,local \
|
||||
-entry test_no_index \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
184
src/demultiplex/main.nf
Normal file
184
src/demultiplex/main.nf
Normal file
@@ -0,0 +1,184 @@
|
||||
workflow run_wf {
|
||||
take:
|
||||
input_ch
|
||||
|
||||
main:
|
||||
samples_ch = input_ch
|
||||
|
||||
// untar input if needed
|
||||
| untar.run(
|
||||
directives: [label: ["lowmem", "lowcpu"]],
|
||||
runIf: {id, state ->
|
||||
def inputStr = state.input.toString()
|
||||
inputStr.endsWith(".tar.gz") || \
|
||||
inputStr.endsWith(".tar") || \
|
||||
inputStr.endsWith(".tgz") ? true : false
|
||||
},
|
||||
fromState: [
|
||||
"input": "input",
|
||||
],
|
||||
toState: { id, result, state ->
|
||||
state + ["input": result.output]
|
||||
},
|
||||
)
|
||||
|
||||
// detect demultiplexer
|
||||
| detect_demultiplexer.run(
|
||||
fromState: [
|
||||
"input": "input",
|
||||
"run_information": "run_information",
|
||||
"demultiplexer": "demultiplexer",
|
||||
],
|
||||
toState: { id, result, state ->
|
||||
state + [
|
||||
"demultiplexer": result.demultiplexer_output,
|
||||
"run_information": result.run_information_output
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
| interop_summary_to_csv.run(
|
||||
runIf: {id, state -> state.demultiplexer in ["bclconvert"]},
|
||||
directives: [label: ["lowmem", "verylowcpu"]],
|
||||
fromState: [
|
||||
"input": "input",
|
||||
],
|
||||
toState: [
|
||||
"interop_run_summary": "output_run_summary",
|
||||
"interop_index_summary": "output_index_summary",
|
||||
]
|
||||
)
|
||||
// run bcl_convert
|
||||
| bcl_convert.run(
|
||||
runIf: {id, state -> state.demultiplexer in ["bclconvert"]},
|
||||
directives: [label: ["highmem", "midcpu"]],
|
||||
fromState: { id, state ->
|
||||
[
|
||||
bcl_input_directory: state.input,
|
||||
sample_sheet: state.run_information,
|
||||
output_directory: state.output,
|
||||
reports: state.demultiplexer_logs,
|
||||
logs: state.demultiplexer_logs,
|
||||
]
|
||||
},
|
||||
toState: {id, result, state ->
|
||||
def toAdd = [
|
||||
"output_demultiplexer" : result.output_directory,
|
||||
"run_id": id,
|
||||
"demultiplexer_logs": result.reports,
|
||||
]
|
||||
def newState = state + toAdd
|
||||
return newState
|
||||
}
|
||||
)
|
||||
// run bases2fastq
|
||||
| bases2fastq.run(
|
||||
runIf: {id, state -> state.demultiplexer in ["bases2fastq"]},
|
||||
directives: [label: ["highmem", "midcpu"]],
|
||||
fromState: { id, state ->
|
||||
[
|
||||
"analysis_directory": state.input,
|
||||
"run_manifest": state.run_information,
|
||||
"output_directory": state.output,
|
||||
"report": state.demultiplexer_logs + "/report.html",
|
||||
"logs": state.demultiplexer_logs,
|
||||
]
|
||||
},
|
||||
args: [
|
||||
"no_projects": true, // Do not put output files in a subfolder for project
|
||||
//"split_lanes": true,
|
||||
"legacy_fastq": true, // Illumina style output names
|
||||
"group_fastq": true, // No subdir per sample
|
||||
],
|
||||
toState: {id, result, state ->
|
||||
def toAdd = [
|
||||
"output_demultiplexer" : result.output_directory,
|
||||
"run_id": id,
|
||||
"demultiplexer_logs": result.logs,
|
||||
|
||||
]
|
||||
def newState = state + toAdd
|
||||
return newState
|
||||
}
|
||||
)
|
||||
| gather_fastqs_and_validate.run(
|
||||
fromState: [
|
||||
"input": "output_demultiplexer",
|
||||
"sample_sheet": "run_information",
|
||||
],
|
||||
toState: [
|
||||
"fastq_forward": "fastq_forward",
|
||||
"fastq_reverse": "fastq_reverse",
|
||||
],
|
||||
)
|
||||
|
||||
output_ch = samples_ch
|
||||
| fastqc.run(
|
||||
directives: [label: ["verylowcpu", "lowmem"]],
|
||||
fromState: {id, state ->
|
||||
def output_base = "$id/qc/fastqc/*"
|
||||
[
|
||||
"input": [state.fastq_forward, state.fastq_reverse],
|
||||
"html": "${output_base}_fastqc_report.html",
|
||||
"summary": "${output_base}_summary.txt",
|
||||
"data": "${output_base}_fastqc_data.txt",
|
||||
]
|
||||
},
|
||||
toState: { id, result, state ->
|
||||
// The output directory for all files above is the same:
|
||||
// take the directory from one of the files
|
||||
state + [ "output_sample_qc": result.html[0].parent ]
|
||||
}
|
||||
)
|
||||
|
||||
| combine_samples.run(
|
||||
fromState: { id, state ->
|
||||
[
|
||||
"id": state.run_id,
|
||||
"forward_input": state.fastq_forward,
|
||||
"reverse_input": state.fastq_reverse,
|
||||
"sample_qc_dir": state.output_sample_qc,
|
||||
]
|
||||
},
|
||||
toState: [
|
||||
"forward_fastqs": "output_forward",
|
||||
"reverse_fastqs": "output_reverse",
|
||||
"output_sample_qc": "output_sample_qc",
|
||||
]
|
||||
)
|
||||
|
||||
| multiqc.run(
|
||||
directives: [label: ["midcpu", "midmem"]],
|
||||
fromState: {id, state ->
|
||||
def new_state = [
|
||||
"input": state.output_sample_qc,
|
||||
"output_report": state.multiqc_output,
|
||||
"cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}'
|
||||
]
|
||||
if (state.demultiplexer == "bclconvert") {
|
||||
new_state["input"] += [
|
||||
state.interop_run_summary.getParent(),
|
||||
state.interop_index_summary.getParent()
|
||||
]
|
||||
}
|
||||
return new_state
|
||||
},
|
||||
toState: { id, result, state ->
|
||||
state + [ "multiqc_output" : result.output_report ]
|
||||
}
|
||||
)
|
||||
|
||||
| setState(
|
||||
[
|
||||
//"_meta": "_meta",
|
||||
"output": "output_demultiplexer",
|
||||
"output_sample_qc": "output_sample_qc",
|
||||
"multiqc_output": "multiqc_output",
|
||||
"output_run_information": "run_information",
|
||||
"demultiplexer_logs": "demultiplexer_logs"
|
||||
]
|
||||
)
|
||||
|
||||
emit:
|
||||
output_ch
|
||||
}
|
||||
10
src/demultiplex/nextflow.config
Normal file
10
src/demultiplex/nextflow.config
Normal file
@@ -0,0 +1,10 @@
|
||||
manifest {
|
||||
nextflowVersion = '!>=20.12.1-edge'
|
||||
}
|
||||
|
||||
params {
|
||||
rootDir = java.nio.file.Paths.get("$projectDir/../../").toAbsolutePath().normalize().toString()
|
||||
}
|
||||
|
||||
// include common settings
|
||||
includeConfig("${params.rootDir}/src/config/labels.config")
|
||||
201
src/demultiplex/test.nf
Normal file
201
src/demultiplex/test.nf
Normal file
@@ -0,0 +1,201 @@
|
||||
nextflow.enable.dsl=2
|
||||
|
||||
include { demultiplex } from params.rootDir + "/target/nextflow/demultiplex/main.nf"
|
||||
|
||||
params.resources_test = params.rootDir + "/testData/"
|
||||
|
||||
workflow test_illumina {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
// sample_sheet: resources_test.resolve("bcl_convert_samplesheet.csv"),
|
||||
// input: resources_test.resolve("iseq-DI/"),
|
||||
//sample_sheet: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/SampleSheet.csv",
|
||||
input: params.resources_test + "200624_A00834_0183_BHMTFYDRXX.tar.gz",
|
||||
publish_dir: "output_dir/",
|
||||
]
|
||||
])
|
||||
| map { state -> [ "run", state ] }
|
||||
| demultiplex.run(
|
||||
toState: { id, output, state ->
|
||||
output + [ orig_input: state.input ] }
|
||||
)
|
||||
| view { output ->
|
||||
assert output.size() == 2 : "outputs should contain two elements; [id, file]"
|
||||
"Output: $output"
|
||||
}
|
||||
|
||||
event_count_ch = output_ch
|
||||
| toSortedList()
|
||||
| map { state ->
|
||||
assert state.size() == 1 : "Expected one event in the output channel"
|
||||
}
|
||||
|
||||
assert_ch = output_ch
|
||||
| map {id, state ->
|
||||
assert state.output.isDirectory(): "Expected bclconvert output to be a directory"
|
||||
state.output_sample_qc.each{
|
||||
assert it.isDirectory(): "Expected sample QC output to be a directory"
|
||||
}
|
||||
assert state.multiqc_output.isFile(): "Expected multiQC output to be a file"
|
||||
fastq_files = state.output.listFiles().collect{it.name}
|
||||
assert ["Undetermined_S0_L001_R1_001.fastq.gz", "Sample23_S3_L001_R1_001.fastq.gz",
|
||||
"sampletest_S4_L001_R1_001.fastq.gz", "Sample1_S1_L001_R1_001.fastq.gz",
|
||||
"SampleA_S2_L001_R1_001.fastq.gz"].toSet() == fastq_files.toSet(): \
|
||||
"Output directory should contain the expected FASTQ files"
|
||||
fastq_files.each{
|
||||
assert it.length() != 0: "Expected FASTQ file to not be empty"
|
||||
}
|
||||
assert state.output_run_information.isFile(): "Expected output run information to be a file"
|
||||
expected_run_information = """[Header]
|
||||
|Date,6/24/2020
|
||||
|Application,Illumina DRAGEN COVIDSeq Test Pipeline
|
||||
|Instrument Type,NovaSeq6000
|
||||
|Assay,Illumina COVIDSeq Test
|
||||
|Index Adapters,IDT-ILMN DNA-RNA UDP Indexes
|
||||
|Chemistry,Amplicon
|
||||
|
||||
|[Settings]
|
||||
|AdapterRead1,CTGTCTCTTATACACATCT
|
||||
|
||||
|[Data]
|
||||
|Lane,Sample_ID,Sample_Type,Index_ID,Index,Index2
|
||||
|1,Sample1,PatientSample,UDP0001,GAACTGAGCG,TCGTGGAGCG
|
||||
|1,SampleA,PatientSample,UDP0002,AGGTCAGATA,CTACAAGATA
|
||||
|1,Sample23,PatientSample,UDP0003,CGTCTCATAT,TATAGTAGCT
|
||||
|1,sampletest,PatientSample,UDP0004,ATTCCATAAG,TGCCTGGTGG
|
||||
|""".stripMargin()
|
||||
assert state.output_run_information.text.replaceAll("\r\n", "\n") == expected_run_information
|
||||
|
||||
println "ID: ${id}"
|
||||
println "State: ${state}"
|
||||
|
||||
assert state.demultiplexer_logs.isDirectory():
|
||||
"Expected BCL Convert reports to be a directory"
|
||||
|
||||
def logs_files = state.demultiplexer_logs.listFiles()
|
||||
println "Logs files: ${logs_files}"
|
||||
assert logs_files.size() > 0: "Expected BCL Convert logs dir to contain files"
|
||||
|
||||
assert logs_files.find { it.name == "Demultiplex_Stats.csv" }:
|
||||
"Expected to find BCL Convert Demultiplex_Stats.csv"
|
||||
assert logs_files.find { it.name == "Logs" }:
|
||||
"Expected to find BCL Convert Logs directory"
|
||||
}
|
||||
}
|
||||
|
||||
workflow test_bases2fastq {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
input: "http://element-public-data.s3.amazonaws.com/bases2fastq-share/bases2fastq-v2/20230404-bases2fastq-sim-151-151-9-9.tar.gz",
|
||||
publish_dir: "output_dir/",
|
||||
]
|
||||
])
|
||||
| map { state -> [ "run", state ] }
|
||||
| demultiplex.run(
|
||||
toState: { id, output, state ->
|
||||
output + [ orig_input: state.input ] }
|
||||
)
|
||||
| view { output ->
|
||||
assert output.size() == 2 : "outputs should contain two elements; [id, file]"
|
||||
"Output: $output"
|
||||
}
|
||||
| map {id, state ->
|
||||
assert state.output.isDirectory(): "Expected bases2fastq output to be a directory"
|
||||
state.output_sample_qc.each{assert it.isDirectory(): "Expected sample QC output to be a directory"}
|
||||
assert state.multiqc_output.isFile(): "Expected multiQC output to be a file"
|
||||
|
||||
def logs_files = state.demultiplexer_logs.listFiles()
|
||||
println "Logs files: ${logs_files}"
|
||||
assert logs_files.size() > 0: "Expected bases2fastq logs dir to contain files"
|
||||
|
||||
assert logs_files.find { it.name == "report.html" } != null:
|
||||
"Expected to find bases2fastq report.html"
|
||||
assert logs_files.find { it.name == "info" }:
|
||||
"Expected to find bases2fastq info directory"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
workflow test_no_index {
|
||||
// Test what happens when no index is specified. All the reads go into one sample
|
||||
// and the "Undetermined" should be empty
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
input: params.resources_test + "demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2",
|
||||
demultiplexer: "bclconvert",
|
||||
run_information: params.resources_test + "demultiplex_htrnaseq_meta/SingleCell-RNA_P3_2/SampleSheetNoIndex.csv"
|
||||
]
|
||||
])
|
||||
| map { state -> [ "run", state ] }
|
||||
| demultiplex.run(
|
||||
toState: { id, output, state ->
|
||||
output + [ orig_input: state.input ] }
|
||||
)
|
||||
| view { output ->
|
||||
assert output.size() == 2 : "outputs should contain two elements; [id, file]"
|
||||
"Output: $output"
|
||||
}
|
||||
|
||||
event_count_ch = output_ch
|
||||
| toSortedList()
|
||||
| map { state ->
|
||||
assert state.size() == 1 : "Expected one event in the output channel"
|
||||
}
|
||||
|
||||
assert_ch = output_ch
|
||||
| map {id, state ->
|
||||
assert state.output.isDirectory(): "Expected bclconvert output to be a directory"
|
||||
state.output_sample_qc.each{
|
||||
assert it.isDirectory(): "Expected sample QC output to be a directory"
|
||||
}
|
||||
assert state.multiqc_output.isFile(): "Expected multiQC output to be a file"
|
||||
fastq_files = state.output.listFiles().collect{it.name}
|
||||
assert ["Undetermined_S0_R2_001.fastq.gz", "Undetermined_S0_R1_001.fastq.gz",
|
||||
"SingleCell-RNA-P3-2-SI-TT-A5_S1_R1_001.fastq.gz", "SingleCell-RNA-P3-2-SI-TT-A5_S1_R2_001.fastq.gz"
|
||||
].toSet() == fastq_files.toSet(): \
|
||||
"Output directory should contain the expected FASTQ files"
|
||||
fastq_files.each{
|
||||
assert it.length() != 0: "Expected FASTQ file to not be empty"
|
||||
}
|
||||
assert state.output_run_information.isFile(): "Expected output run information to be a file"
|
||||
expected_run_information = """[Header],,,,
|
||||
|FileFormatVersion,2,,,
|
||||
|RunName,SingleCell-RNA_P3_2,,,
|
||||
|InstrumentPlatform,NextSeq1k2k,,,
|
||||
|IndexOrientation,Forward,,,
|
||||
|,,,,
|
||||
|[Reads],,,,
|
||||
|Read1Cycles,28,,,
|
||||
|Read2Cycles,90,,,
|
||||
|Index1Cycles,10,,,
|
||||
|Index2Cycles,10,,,
|
||||
|,,,,
|
||||
|[BCLConvert_Settings],,,,
|
||||
|SoftwareVersion,4.2.7,,,
|
||||
|TrimUMI,0,,,
|
||||
|OverrideCycles,U28;N10;N10;Y90,,,
|
||||
|FastqCompressionFormat,gzip,,,
|
||||
|NoLaneSplitting,TRUE,,,
|
||||
|,,,,
|
||||
|[BCLConvert_Data],,,,
|
||||
|Sample_ID,Index,Index2,,
|
||||
|SingleCell-RNA-P3-2-SI-TT-A5,,,,
|
||||
|,,,,""".stripMargin()
|
||||
assert state.output_run_information.text.replaceAll("\r\n", "\n") == expected_run_information
|
||||
|
||||
println "ID: ${id}"
|
||||
println "State: ${state}"
|
||||
|
||||
assert state.demultiplexer_logs.isDirectory():
|
||||
"Expected BCL Convert reports to be a directory"
|
||||
|
||||
def logs_files = state.demultiplexer_logs.listFiles()
|
||||
println "Logs files: ${logs_files}"
|
||||
assert logs_files.size() > 0: "Expected BCL Convert logs dir to contain files"
|
||||
|
||||
assert logs_files.find { it.name == "Demultiplex_Stats.csv" }:
|
||||
"Expected to find BCL Convert Demultiplex_Stats.csv"
|
||||
assert logs_files.find { it.name == "Logs" }:
|
||||
"Expected to find BCL Convert Logs directory"
|
||||
}
|
||||
}
|
||||
57
src/detect_demultiplexer/config.vsh.yaml
Normal file
57
src/detect_demultiplexer/config.vsh.yaml
Normal file
@@ -0,0 +1,57 @@
|
||||
name: detect_demultiplexer
|
||||
description: |
|
||||
Detects the demultiplexer and accompanying sample information file which can be
|
||||
used to generate the fastq files.
|
||||
arguments:
|
||||
- name: --id
|
||||
description: Unique identifier for the run
|
||||
type: string
|
||||
- name: --input
|
||||
description: Directory containing raw sequencing data
|
||||
type: file
|
||||
required: true
|
||||
- name: --run_information
|
||||
description: |
|
||||
CSV file containing sample information, which will be used as
|
||||
input for the demultiplexer. Canonically called 'SampleSheet.csv' (Illumina)
|
||||
or 'RunManifest.csv' (Element Biosciences). If not specified,
|
||||
will try to autodetect the sample sheet in the input directory.
|
||||
Requires --demultiplexer to be set.
|
||||
type: file
|
||||
required: false
|
||||
- name: "--demultiplexer"
|
||||
type: string
|
||||
required: false
|
||||
choices: ["bases2fastq", "bclconvert"]
|
||||
description: |
|
||||
Demultiplexer to use, choice depends on the provider
|
||||
of the instrument that was used to generate the data.
|
||||
When not using --sample_sheet, specifying this argument is not
|
||||
required.
|
||||
|
||||
- name: --demultiplexer_output
|
||||
description: |
|
||||
Demultiplexer program. The demultiplexer is either provided (with --demultiplexer),
|
||||
or inferred from the contents of the input data.
|
||||
type: string
|
||||
direction: output
|
||||
required: false
|
||||
- name: --run_information_output
|
||||
description: |
|
||||
Sample information that can be used to demultiplex the input data.
|
||||
An appropriate file was either provided (with --run_information), or
|
||||
inferred from the contents of the input data.
|
||||
type: file
|
||||
direction: output
|
||||
required: false
|
||||
|
||||
resources:
|
||||
- type: nextflow_script
|
||||
path: main.nf
|
||||
entrypoint: run_wf
|
||||
|
||||
runners:
|
||||
- type: nextflow
|
||||
|
||||
engines:
|
||||
- type: native
|
||||
96
src/detect_demultiplexer/main.nf
Normal file
96
src/detect_demultiplexer/main.nf
Normal file
@@ -0,0 +1,96 @@
|
||||
workflow run_wf {
|
||||
take:
|
||||
input_ch // Channel with [id, state] pairs
|
||||
|
||||
main:
|
||||
output_ch = input_ch
|
||||
|
||||
// Gather input files from folder
|
||||
| map {id, state ->
|
||||
def newState = [:]
|
||||
println("Provided run information: ${state.run_information} and demultiplexer: ${state.demultiplexer}")
|
||||
// No auto-detection of run information file (it is user provided),
|
||||
// in this case the demultiplexer should also be specified.
|
||||
assert (!state.run_information || state.demultiplexer): "When setting --run_information, " +
|
||||
"you must also provide a demultiplexer"
|
||||
|
||||
if (!state.run_information) {
|
||||
println("Run information was not specified, auto-detecting...")
|
||||
// The supported_platforms hashmap must be a 1-on-1 mapping
|
||||
// Also, it's keys must be present in the 'choices' field
|
||||
// for the 'run_information' argument in the viash config.
|
||||
def supported_platforms = [
|
||||
"bclconvert": "SampleSheet.csv", // Illumina
|
||||
"bases2fastq": "RunManifest.csv" // Element Biosciences
|
||||
]
|
||||
def found_sample_information = supported_platforms.collectEntries{demultiplexer, filename ->
|
||||
println("Checking if ${filename} can be found in input folder ${state.input}.")
|
||||
def resolved_filename = state.input.resolve(filename)
|
||||
if (!resolved_filename.isFile()) {
|
||||
resolved_filename = null
|
||||
}
|
||||
println("Result after looking for run information for ${demultiplexer}: ${resolved_filename}.")
|
||||
[demultiplexer, resolved_filename]
|
||||
}
|
||||
def demultiplexer = null
|
||||
def run_information = null
|
||||
found_sample_information.each{demultiplexer_candidate, file_path ->
|
||||
if (file_path) {
|
||||
// At this point, a candicate run information file was found.
|
||||
assert !run_information: "Autodetection of run information " +
|
||||
"(SampleSheet, RunManifest) failed: " +
|
||||
"multiple candidate files found in input folder. " +
|
||||
"Please specify one using --run_information."
|
||||
run_information = file_path
|
||||
demultiplexer = demultiplexer_candidate
|
||||
}
|
||||
}
|
||||
|
||||
// When autodetecting, the run information should have been found
|
||||
assert run_information: "No run information file (SampleSheet, RunManifest) " +
|
||||
"found in input directory."
|
||||
|
||||
// When autodetecting, the demultiplexer must be set if the run information was found
|
||||
assert demultiplexer: "State error: the demultiplexer should have been autodetected. " +
|
||||
"Please report this as a bug."
|
||||
|
||||
// When autodetecting, the found demultiplexer must match
|
||||
// with the demultiplexer that the user has provided (in case it was provided).
|
||||
if (state.demultiplexer) {
|
||||
assert state.demultiplexer == demultiplexer,
|
||||
"Requested to use demultiplexer ${state.demultiplexer} " +
|
||||
"but demultiplexer based on the autodetected run information "
|
||||
"file ${run_information} seems to indicate that the demultiplexer "
|
||||
"should be ${demultiplexer}. Either avoid specifying the demultiplexer "
|
||||
"or override the autodetection of the run information by providing "
|
||||
"the file."
|
||||
}
|
||||
println("Using run information ${run_information} and demultiplexer ${demultiplexer}")
|
||||
// At this point, the autodetected state can override the user provided state.
|
||||
newState = newState + [
|
||||
"run_information": run_information,
|
||||
"demultiplexer": demultiplexer,
|
||||
]
|
||||
} // end auto-detection logic
|
||||
|
||||
if (newState.demultiplexer in ["bclconvert"]) {
|
||||
// Do not add InterOp to state because we generate the summary csv's in the next
|
||||
// step based on the run dir, not the InterOp dir.
|
||||
def interop_dir = state.input.resolve("InterOp")
|
||||
assert interop_dir.isDirectory(): "Expected InterOp directory to be present."
|
||||
|
||||
def copycomplete_file = state.input.resolve("CopyComplete.txt")
|
||||
assert (copycomplete_file.isFile() || state.skip_copycomplete_check):
|
||||
"'CopyComplete.txt' file was not found!"
|
||||
}
|
||||
|
||||
def resultState = state + newState
|
||||
[id, resultState]
|
||||
}
|
||||
|
||||
| setState(["demultiplexer_output": "demultiplexer",
|
||||
"run_information_output": "run_information"])
|
||||
|
||||
emit:
|
||||
output_ch
|
||||
}
|
||||
45
src/io/interop_summary_to_csv/config.vsh.yaml
Normal file
45
src/io/interop_summary_to_csv/config.vsh.yaml
Normal file
@@ -0,0 +1,45 @@
|
||||
name: interop_summary_to_csv
|
||||
namespace: io
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --input
|
||||
description: Sequencing run folder (*not* InterOp folder).
|
||||
type: file
|
||||
required: true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --output_run_summary
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
- name: --output_index_summary
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
requirements:
|
||||
commands: ["summary", "index-summary"]
|
||||
resources:
|
||||
- type: bash_script
|
||||
path: script.sh
|
||||
test_resources:
|
||||
- type: bash_script
|
||||
path: test.sh
|
||||
- path: /testData/iseq-DI
|
||||
engines:
|
||||
- type: docker
|
||||
image: debian:stable-slim
|
||||
setup:
|
||||
- type: apt
|
||||
packages:
|
||||
- procps
|
||||
- wget
|
||||
- type: docker
|
||||
run: |
|
||||
wget https://github.com/Illumina/interop/releases/download/v1.3.1/interop-1.3.1-Linux-GNU.tar.gz -O /tmp/interop.tar.gz && \
|
||||
tar -C /tmp/ --no-same-owner --no-same-permissions -xvf /tmp/interop.tar.gz && \
|
||||
mv /tmp/interop-1.3.1-Linux-GNU/bin/index-summary /tmp/interop-1.3.1-Linux-GNU/bin/summary /usr/local/bin/
|
||||
|
||||
runners:
|
||||
- type: executable
|
||||
- type: nextflow
|
||||
10
src/io/interop_summary_to_csv/script.sh
Normal file
10
src/io/interop_summary_to_csv/script.sh
Normal file
@@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
if [ ! -d "$par_input" ]; then
|
||||
echo "Input directory does not exist or is not a directory"
|
||||
exit 1
|
||||
fi
|
||||
$(which summary) --csv=1 "$par_input" 1> "$par_output_run_summary"
|
||||
$(which index-summary) --csv=1 "$par_input" 1> "$par_output_index_summary"
|
||||
18
src/io/interop_summary_to_csv/test.sh
Normal file
18
src/io/interop_summary_to_csv/test.sh
Normal file
@@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
# create tempdir
|
||||
echo ">>> Creating temporary test directory."
|
||||
TMPDIR=$(mktemp -d "$meta_temp_dir/$meta_functionality_name-XXXXXX")
|
||||
function clean_up {
|
||||
[[ -d "$TMPDIR" ]] && rm -r "$TMPDIR"
|
||||
}
|
||||
trap clean_up EXIT
|
||||
echo ">>> Created temporary directory '$TMPDIR'."
|
||||
|
||||
echo ">>> Run simple execution"
|
||||
./$meta_functionality_name \
|
||||
--input "$meta_resources_dir/iseq-DI" \
|
||||
--output_run_summary "$TMPDIR/run_summary.csv" \
|
||||
--output_index_summary "$TMPDIR/index_summary.csv"
|
||||
34
src/io/publish/code.sh
Executable file
34
src/io/publish/code.sh
Executable file
@@ -0,0 +1,34 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
declare -A input_output_mapping=(["par_input"]="par_output"
|
||||
["par_input_multiqc"]="par_output_multiqc"
|
||||
["par_input_run_information"]="par_output_run_information"
|
||||
["par_input_demultiplexer_logs"]="par_output_demultiplexer_logs"
|
||||
)
|
||||
|
||||
for input_argument_name in "${!input_output_mapping[@]}"
|
||||
do
|
||||
input_location="${!input_argument_name}"
|
||||
output_argument_name="${input_output_mapping[$input_argument_name]}"
|
||||
output_location="${!output_argument_name}"
|
||||
echo "Publishing $input_location -> $output_location"
|
||||
|
||||
echo "Creating directory if it does not exist."
|
||||
mkdir -p $(dirname "$output_location") && echo "Containing directory $output_location created"
|
||||
|
||||
echo "Copying files..."
|
||||
cp -rL "$input_location" "$output_location"
|
||||
|
||||
echo "Output files for $output_location:"
|
||||
ls "$output_location"
|
||||
done
|
||||
|
||||
echo "Grouping output from $par_input_sample_qc into $par_output_sample_qc"
|
||||
mkdir -p "$par_output_sample_qc"
|
||||
IFS=";" read -ra sample_qc_inputs <<< $par_input_sample_qc
|
||||
for qc_dir in "${sample_qc_inputs[@]}"; do
|
||||
echo "Copying contents of $qc_dir"
|
||||
find -H -D exec "$qc_dir" -type f -maxdepth 1 -exec cp -t "$par_output_sample_qc" {} +
|
||||
done
|
||||
64
src/io/publish/config.vsh.yaml
Normal file
64
src/io/publish/config.vsh.yaml
Normal file
@@ -0,0 +1,64 @@
|
||||
name: "publish"
|
||||
namespace: "io"
|
||||
description: "Publish the processed results of the run"
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --input
|
||||
description: Directory to write fastq data to
|
||||
type: file
|
||||
required: true
|
||||
- name: "--input_sample_qc"
|
||||
description: Directory to write sample QC output to
|
||||
type: file
|
||||
required: true
|
||||
multiple: true
|
||||
- name: "--input_multiqc"
|
||||
description: Location where to write the MultiQC report to.
|
||||
type: file
|
||||
required: true
|
||||
- name: "--input_run_information"
|
||||
description: "Location where to write the run information to."
|
||||
type: file
|
||||
required: true
|
||||
- name: "--input_demultiplexer_logs"
|
||||
type: file
|
||||
required: true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --output
|
||||
type: file
|
||||
direction: output
|
||||
default: "fastq"
|
||||
- name: --output_sample_qc
|
||||
type: file
|
||||
direction: output
|
||||
default: "qc/fastqc"
|
||||
- name: --output_multiqc
|
||||
type: file
|
||||
direction: output
|
||||
default: "qc/multiqc_report.html"
|
||||
- name: --output_run_information
|
||||
type: file
|
||||
direction: output
|
||||
default: run_information.csv
|
||||
- name: "--output_demultiplexer_logs"
|
||||
type: file
|
||||
direction: output
|
||||
default: "demultiplexer_logs"
|
||||
|
||||
resources:
|
||||
- type: bash_script
|
||||
path: ./code.sh
|
||||
|
||||
engines:
|
||||
- type: docker
|
||||
image: debian:stable-slim
|
||||
setup:
|
||||
- type: apt
|
||||
packages:
|
||||
- procps
|
||||
|
||||
runners:
|
||||
- type: executable
|
||||
- type: nextflow
|
||||
44
src/io/untar/config.vsh.yaml
Normal file
44
src/io/untar/config.vsh.yaml
Normal file
@@ -0,0 +1,44 @@
|
||||
name: untar
|
||||
namespace: io
|
||||
description: |
|
||||
Unpack a .tar file. When the contents of the .tar file is just a single directory,
|
||||
put the contents of the directory into the output folder instead of that directory.
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --input
|
||||
description: Tarball file to be unpacked.
|
||||
type: file
|
||||
required: true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --output
|
||||
description: Directory to write the contents of the .tar file to.
|
||||
type: file
|
||||
direction: output
|
||||
required: true
|
||||
- name: "Other arguments"
|
||||
arguments:
|
||||
- name: "--exclude"
|
||||
alternatives: ["-e"]
|
||||
type: string
|
||||
description: Prevents any file or member whose name matches the shell wildcard (pattern) from being extracted.
|
||||
example: "docs/figures"
|
||||
required: false
|
||||
resources:
|
||||
- type: bash_script
|
||||
path: script.sh
|
||||
test_resources:
|
||||
- type: bash_script
|
||||
path: test.sh
|
||||
engines:
|
||||
- type: docker
|
||||
image: debian:stable-slim
|
||||
setup:
|
||||
- type: apt
|
||||
packages:
|
||||
- procps
|
||||
|
||||
runners:
|
||||
- type: executable
|
||||
- type: nextflow
|
||||
41
src/io/untar/script.sh
Normal file
41
src/io/untar/script.sh
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
extra_args=()
|
||||
|
||||
TMPDIR=$(mktemp -d "$meta_temp_dir/$meta_functionality_name-XXXXXX")
|
||||
function clean_up {
|
||||
[[ -d "$TMPDIR" ]] && rm -r "$TMPDIR"
|
||||
}
|
||||
trap clean_up EXIT
|
||||
|
||||
# Check if tarball contains 1 top-level directory. If so, extract the contents of the
|
||||
# directory to the output directory instead of the directory itself.
|
||||
echo "Directory contents:"
|
||||
tar -taf "${par_input}" > "$TMPDIR/tar_contents.txt"
|
||||
cat "$TMPDIR/tar_contents.txt"
|
||||
|
||||
printf "Checking if tarball contains only a single top-level directory: "
|
||||
if [[ $(grep -o -E '^[./]*[^/]+/$' "$TMPDIR/tar_contents.txt" | uniq | wc -l) -eq 1 ]]; then
|
||||
echo "It does."
|
||||
echo "Extracting the contents of the top-level directory to the output directory instead of the directory itself."
|
||||
# The directory can be both of the format './<directory>' (or ././<directory>) or just <directory>
|
||||
# Adjust the number of stripped components accordingly by looking for './' at the beginning of the file.
|
||||
starting_relative=$(grep -oP -m 1 '^(./)*' "$TMPDIR/tar_contents.txt" | tr -d '\n' | wc -c)
|
||||
n_strips=$(( ($starting_relative / 2)+1 ))
|
||||
extra_args+=("--strip-components=$n_strips")
|
||||
else
|
||||
echo "It does not."
|
||||
fi
|
||||
|
||||
if [ "$par_exclude" != "" ]; then
|
||||
echo "Exclusion of files with wildcard '$par_exclude' requested."
|
||||
extra_args+=("--exclude=$par_exclude")
|
||||
fi
|
||||
|
||||
echo "Starting extraction of tarball '$par_input' to output directory '$par_output'."
|
||||
mkdir -p "$par_output"
|
||||
echo "executing 'tar --no-same-owner --no-same-permissions --directory=$par_output ${extra_args[@]} -xavf $par_input'"
|
||||
tar --no-same-owner --no-same-permissions --directory="$par_output" ${extra_args[@]} -xavf "$par_input"
|
||||
|
||||
126
src/io/untar/test.sh
Normal file
126
src/io/untar/test.sh
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -eo pipefail
|
||||
|
||||
# create tempdir
|
||||
echo ">>> Creating temporary test directory."
|
||||
TMPDIR=$(mktemp -d "$meta_temp_dir/$meta_functionality_name-XXXXXX")
|
||||
function clean_up {
|
||||
[[ -d "$TMPDIR" ]] && rm -r "$TMPDIR"
|
||||
}
|
||||
trap clean_up EXIT
|
||||
echo ">>> Created temporary directory '$TMPDIR'."
|
||||
|
||||
INPUT_FILE="$TMPDIR/test_file.txt"
|
||||
echo ">>> Creating test input file at '$TMPDIR/test_file.txt'."
|
||||
echo "foo" > "$INPUT_FILE"
|
||||
echo ">>> Created '$INPUT_FILE'."
|
||||
|
||||
echo ">>> Creating tar.gz from '$INPUT_FILE'."
|
||||
TARFILE="${INPUT_FILE}.tar.gz"
|
||||
tar -C "$TMPDIR" -czvf ${INPUT_FILE}.tar.gz $(basename "$INPUT_FILE")
|
||||
[[ ! -f "$TARFILE" ]] && echo ">>> Test setup failed: could not create tarfile." && exit 1
|
||||
echo ">>> '$TARFILE' created."
|
||||
|
||||
echo ">>> Check whether tar.gz can be extracted"
|
||||
echo ">>> Creating temporary output directory for test 1."
|
||||
OUTPUT_DIR_1="$TMPDIR/output_test_1/"
|
||||
mkdir "$OUTPUT_DIR_1"
|
||||
|
||||
echo ">>> Extracting '$TARFILE' to '$OUTPUT_DIR_1'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TARFILE" \
|
||||
--output "$OUTPUT_DIR_1"
|
||||
|
||||
echo ">>> Check whether extracted file exists"
|
||||
[[ ! -f "$OUTPUT_DIR_1/test_file.txt" ]] && echo "Output file could not be found. Output directory contents: " && ls "$OUTPUT_DIR_1" && exit 1
|
||||
|
||||
echo ">>> Creating temporary output directory for test 2."
|
||||
OUTPUT_DIR_2="$TMPDIR/output_test_2/"
|
||||
mkdir "$OUTPUT_DIR_2"
|
||||
|
||||
echo ">>> Extracting '$TARFILE' to '$OUTPUT_DIR_2', excluding '$test_file.txt'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TARFILE" \
|
||||
--output "$OUTPUT_DIR_2" \
|
||||
--exclude 'test_file.txt'
|
||||
|
||||
echo ">>> Check whether excluded file was not extracted"
|
||||
[[ -f "$OUTPUT_DIR_2/test_file.txt" ]] && echo "File should have been excluded! Output directory contents:" && ls "$OUTPUT_DIR_2" && exit 1
|
||||
|
||||
echo ">>> Creating test tarball containing only 1 top-level directory."
|
||||
mkdir "$TMPDIR/input_test_3/"
|
||||
cp "$INPUT_FILE" "$TMPDIR/input_test_3/"
|
||||
tar -C "$TMPDIR" -czvf "$TMPDIR/input_test_3.tar.gz" $(basename "$TMPDIR/input_test_3")
|
||||
TARFILE_3="$TMPDIR/input_test_3.tar.gz"
|
||||
|
||||
echo ">>> Creating temporary output directory for test 3."
|
||||
OUTPUT_DIR_3="$TMPDIR/output_test_3/"
|
||||
mkdir "$OUTPUT_DIR_3"
|
||||
|
||||
echo "Extracting '$TARFILE_3' to '$OUTPUT_DIR_3'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TARFILE_3" \
|
||||
--output "$OUTPUT_DIR_3"
|
||||
|
||||
echo ">>> Check whether extracted file exists"
|
||||
[[ ! -f "$OUTPUT_DIR_3/test_file.txt" ]] && echo "Output file could not be found!" && exit 1
|
||||
|
||||
echo ">>> Check for tar archive that contains a single directory starting with './'."
|
||||
mkdir "$TMPDIR/input_test_4/"
|
||||
cp "$INPUT_FILE" "$TMPDIR/input_test_4/"
|
||||
|
||||
pushd "$TMPDIR/"
|
||||
trap popd ERR
|
||||
tar -czvf "$TMPDIR/input_test_4.tar.gz" ./input_test_4
|
||||
popd
|
||||
trap - ERR
|
||||
|
||||
OUTPUT_DIR_4="$TMPDIR/output_test_4/"
|
||||
echo "Extracting '$TMPDIR/input_test_4.tar.gz' to '$OUTPUT_DIR_4'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TMPDIR/input_test_4.tar.gz" \
|
||||
--output "$OUTPUT_DIR_4"
|
||||
|
||||
echo ">>> Check whether extracted file exists"
|
||||
[[ ! -f "$OUTPUT_DIR_4/test_file.txt" ]] && echo "Output file could not be found!" && exit 1
|
||||
|
||||
echo ">>> Creating test tarball containing only 1 top-level directory, but it is nested."
|
||||
mkdir -p "$TMPDIR/input_test_5/nested/"
|
||||
cp "$INPUT_FILE" "$TMPDIR/input_test_5/nested/"
|
||||
tar -C "$TMPDIR" -czvf "$TMPDIR/input_test_5.tar.gz" $(basename "$TMPDIR/input_test_5")
|
||||
TARFILE_5="$TMPDIR/input_test_5.tar.gz"
|
||||
|
||||
echo ">>> Creating temporary output directory for test 5."
|
||||
OUTPUT_DIR_5="$TMPDIR/output_test_5/"
|
||||
mkdir "$OUTPUT_DIR_5"
|
||||
|
||||
echo "Extracting '$TARFILE_5' to '$OUTPUT_DIR_5'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TARFILE_5" \
|
||||
--output "$OUTPUT_DIR_5"
|
||||
|
||||
echo ">>> Check whether extracted file exists"
|
||||
[[ ! -f "$OUTPUT_DIR_5/nested/test_file.txt" ]] && echo "Output file could not be found!" && exit 1
|
||||
|
||||
echo ">>> Creating test tarball containing two top-level directories."
|
||||
mkdir -p "$TMPDIR/input_test_6/number_1/"
|
||||
mkdir "$TMPDIR/input_test_6/number_2/"
|
||||
cp "$INPUT_FILE" "$TMPDIR/input_test_6/number_1/"
|
||||
tar -C "$TMPDIR" -czvf "$TMPDIR/input_test_6.tar.gz" $(basename "$TMPDIR/input_test_6")
|
||||
TARFILE_6="$TMPDIR/input_test_6.tar.gz"
|
||||
|
||||
echo ">>> Creating temporary output directory for test 6."
|
||||
OUTPUT_DIR_6="$TMPDIR/output_test_6/"
|
||||
mkdir "$OUTPUT_DIR_6"
|
||||
|
||||
echo "Extracting '$TARFILE_6' to '$OUTPUT_DIR_6'".
|
||||
./$meta_functionality_name \
|
||||
--input "$TARFILE_6" \
|
||||
--output "$OUTPUT_DIR_6"
|
||||
|
||||
echo ">>> Check whether extracted file exists"
|
||||
[[ ! -f "$OUTPUT_DIR_6/number_1/test_file.txt" ]] && echo "Output file could not be found!" && exit 1
|
||||
[[ ! -d "$OUTPUT_DIR_6/number_2" ]] && echo "Output directory could not be found!" && exit 1
|
||||
|
||||
echo ">>> Test finished successfully"
|
||||
86
src/runner/config.vsh.yaml
Normal file
86
src/runner/config.vsh.yaml
Normal file
@@ -0,0 +1,86 @@
|
||||
name: runner
|
||||
description: Runner for demultiplexing of raw sequencing data
|
||||
argument_groups:
|
||||
- name: Input arguments
|
||||
arguments:
|
||||
- name: --input
|
||||
description: |
|
||||
Base directory of the canonical form `s3://<bucket>/<path>/<RunID>/`.
|
||||
A tarball (tar.gz, .tgz, .tar) containing run information can be provided.
|
||||
The <RunID> is the value passed to the `id` argument.
|
||||
type: file
|
||||
required: true
|
||||
- name: --run_information
|
||||
description: |
|
||||
CSV file containing sample information, which will be used as
|
||||
input for the demultiplexer. Canonically called 'SampleSheet.csv' (Illumina)
|
||||
or 'RunManifest.csv' (Element Biosciences). If not specified,
|
||||
will try to autodetect the sample sheet in the input directory.
|
||||
Requires --demultiplexer to be set.
|
||||
type: file
|
||||
required: false
|
||||
- name: "--demultiplexer"
|
||||
type: string
|
||||
required: false
|
||||
choices: ["bases2fastq", "bclconvert"]
|
||||
description: |
|
||||
Demultiplexer to use, choice depends on the provider
|
||||
of the instrument that was used to generate the data.
|
||||
When not using --sample_sheet, specifying this argument is not
|
||||
required.
|
||||
- name: Annotation flags
|
||||
arguments:
|
||||
- name: --plain_output
|
||||
description: |
|
||||
Flag to indicate that the output should be stored directly under $publish_dir rather than
|
||||
under a subdirectory structure runID/<date_time>_demultiplex_<version>/.
|
||||
type: boolean_true
|
||||
- name: Output arguments
|
||||
arguments:
|
||||
- name: --fastq_output
|
||||
type: file
|
||||
direction: output
|
||||
default: "fastq"
|
||||
- name: --sample_qc_output
|
||||
type: file
|
||||
direction: output
|
||||
default: "qc/fastqc"
|
||||
- name: --multiqc_output
|
||||
type: file
|
||||
direction: output
|
||||
default: "qc/multiqc_report.html"
|
||||
- name: "--demultiplexer_logs"
|
||||
type: file
|
||||
direction: output
|
||||
default: "demultiplexer_logs"
|
||||
- name: "Other arguments"
|
||||
arguments:
|
||||
- name: --skip_copycomplete_check
|
||||
type: boolean_true
|
||||
description: |
|
||||
Disable the check for the presence of a "CopyComplete.txt" file in input
|
||||
directory in case of Illumina data.
|
||||
resources:
|
||||
- type: nextflow_script
|
||||
path: main.nf
|
||||
entrypoint: run_wf
|
||||
- path: disable_publish_processes.config
|
||||
test_resources:
|
||||
- type: nextflow_script
|
||||
path: test.nf
|
||||
entrypoint: test
|
||||
|
||||
dependencies:
|
||||
- name: demultiplex
|
||||
repository: local
|
||||
- name: io/publish
|
||||
repository: local
|
||||
|
||||
runners:
|
||||
- type: nextflow
|
||||
config:
|
||||
script:
|
||||
- includeConfig("disable_publish_processes.config")
|
||||
|
||||
engines:
|
||||
- type: native
|
||||
9
src/runner/disable_publish_processes.config
Normal file
9
src/runner/disable_publish_processes.config
Normal file
@@ -0,0 +1,9 @@
|
||||
process {
|
||||
withName: publishFilesProc {
|
||||
publishDir = [ enabled: false ]
|
||||
}
|
||||
|
||||
withName: publishStatesProc {
|
||||
publishDir = [ enabled: false ]
|
||||
}
|
||||
}
|
||||
16
src/runner/integration_tests.sh
Executable file
16
src/runner/integration_tests.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# get the root of the directory
|
||||
REPO_ROOT=$(git rev-parse --show-toplevel)
|
||||
|
||||
# ensure that the command below is run from the root of the repository
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
viash ns build --setup cb -q runner
|
||||
|
||||
nextflow run . \
|
||||
-main-script src/runner/test.nf \
|
||||
-entry test \
|
||||
-profile docker,local \
|
||||
-c src/config/labels.config \
|
||||
-resume
|
||||
177
src/runner/main.nf
Normal file
177
src/runner/main.nf
Normal file
@@ -0,0 +1,177 @@
|
||||
import java.util.concurrent.ThreadPoolExecutor
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
def date = new Date().format('yyyyMMdd_hhmmss')
|
||||
def viash_config = java.nio.file.Paths.get("${moduleDir}/_viash.yaml")
|
||||
def version = get_version(viash_config)
|
||||
|
||||
session = nextflow.Nextflow.getSession()
|
||||
final service = session.publishDirExecutorService()
|
||||
|
||||
|
||||
// S3 paths containing double slashes might cause issues with empty objects being created
|
||||
// Remove trailing slashes from the publish dir. The params map is immutable, so create a copy
|
||||
def publish_dir = params.publish_dir - ~/\/+$/
|
||||
|
||||
workflow run_wf {
|
||||
take:
|
||||
input_ch
|
||||
|
||||
main:
|
||||
output_ch = input_ch
|
||||
| map { id, state ->
|
||||
// The argument names for this workflow and the demultiplex workflow may overlap
|
||||
// here, we store a copy in order to make sure to not accidentally overwrite the state.
|
||||
def new_state = state + [
|
||||
"fastq_output_workflow": state.fastq_output,
|
||||
"multiqc_output_workflow": state.multiqc_output,
|
||||
"sample_qc_output_workflow": state.sample_qc_output,
|
||||
"demultiplexer_logs_workflow": state.demultiplexer_logs,
|
||||
"run_id": id
|
||||
]
|
||||
return [id, new_state]
|
||||
}
|
||||
| demultiplex.run(
|
||||
fromState: { id, state ->
|
||||
def state_to_pass = [
|
||||
"input": state.input,
|
||||
"run_information": state.run_information,
|
||||
"demultiplexer": state.demultiplexer,
|
||||
"skip_copycomplete_check": state.skip_copycomplete_check,
|
||||
"output": "$id/fastq",
|
||||
"output_sample_qc": "$id/qc/fastqc",
|
||||
"multiqc_output": "$id/qc/multiqc_report.html",
|
||||
"demultiplexer_logs": "$id/demultiplexer_logs",
|
||||
]
|
||||
if (state.run_information) {
|
||||
state_to_pass += ["output_run_information": state.run_information.getName()]
|
||||
}
|
||||
state_to_pass
|
||||
},
|
||||
toState: { id, result, state ->
|
||||
// Duplicate the results under its own key, makes it easier to access later.
|
||||
state + result + [ to_return: result ]
|
||||
},
|
||||
)
|
||||
| map {id, state ->
|
||||
def id1 = (state.plain_output) ? id : "${state.run_id}/${date}"
|
||||
def id2 = (state.plain_output) ? id : "${id1}_demultiplex_${version}"
|
||||
def prefix = (id2 == "run") ? "" : "${id2}/"
|
||||
def new_state = state + ["prefix": prefix]
|
||||
[id, new_state]
|
||||
}
|
||||
| publish.run(
|
||||
fromState: { id, state ->
|
||||
def prefix = state.prefix
|
||||
// These output names are determined by arguments.
|
||||
def fastq_output_1 = "${prefix}${state.fastq_output_workflow}"
|
||||
def sample_qc_output_1 = "${prefix}${state.sample_qc_output_workflow}"
|
||||
def multiqc_output_1 = "${prefix}${state.multiqc_output_workflow}"
|
||||
def demultiplexer_logs_output = "${prefix}${state.demultiplexer_logs_workflow}"
|
||||
// The name of the output file for the run information is determined by the input file name.
|
||||
def run_information_output_1 = "${prefix}${state.output_run_information.getName()}"
|
||||
|
||||
println("Publishing to ${publish_dir}/${prefix}")
|
||||
[
|
||||
input: state.output,
|
||||
input_sample_qc: state.output_sample_qc,
|
||||
input_multiqc: state.multiqc_output,
|
||||
input_run_information: state.output_run_information,
|
||||
input_demultiplexer_logs: state.demultiplexer_logs,
|
||||
output: fastq_output_1,
|
||||
output_sample_qc: sample_qc_output_1,
|
||||
output_multiqc: multiqc_output_1,
|
||||
output_run_information: run_information_output_1,
|
||||
output_demultiplexer_logs: demultiplexer_logs_output,
|
||||
]
|
||||
},
|
||||
toState: { id, result, state -> [
|
||||
"fastq_output": result.output,
|
||||
"prefix": state.prefix,
|
||||
"multiqc_output": result.output_multiqc,
|
||||
"sample_qc_output": result.output_sample_qc,
|
||||
"demultiplexer_logs": result.output_demultiplexer_logs,
|
||||
]
|
||||
},
|
||||
directives: [
|
||||
publishDir: [
|
||||
path: publish_dir,
|
||||
overwrite: false,
|
||||
mode: "copy"
|
||||
]
|
||||
]
|
||||
)
|
||||
|
||||
has_published = new AtomicBoolean(false)
|
||||
|
||||
interval_ch = channel.interval('10s'){ i ->
|
||||
// Allow this channel to stop generating events based on a later signal
|
||||
if (has_published.get()) {
|
||||
return channel.STOP
|
||||
}
|
||||
i
|
||||
}
|
||||
|
||||
await_ch = output_ch
|
||||
// Wait for demultiplexing processes to be done
|
||||
| toSortedList()
|
||||
// Create periodic events in order to check for the publishing to be done
|
||||
| combine(interval_ch)
|
||||
| until { event ->
|
||||
println("Checking if publishing has finished in service ${service}")
|
||||
def running_tasks = null
|
||||
if(service instanceof ThreadPoolExecutor) {
|
||||
def completed_tasks = service.getCompletedTaskCount()
|
||||
def task_count = service.getTaskCount()
|
||||
running_tasks = completed_tasks - task_count
|
||||
}
|
||||
else if( System.getenv('NXF_ENABLE_VIRTUAL_THREADS') ) {
|
||||
running_tasks = service.threadCount()
|
||||
}
|
||||
else {
|
||||
error("Publishing service of class ${service.getClass()} is not supported.")
|
||||
}
|
||||
|
||||
if (running_tasks == 0) {
|
||||
println("Publishing has finished all current tasks. Continuing execution.")
|
||||
return true
|
||||
}
|
||||
println("Workflow is publishing. Waiting...")
|
||||
return false
|
||||
}
|
||||
| last()
|
||||
| map{ event ->
|
||||
// Signal to interval channel to stop generating events.
|
||||
has_published.compareAndSet(false, true)
|
||||
return event[0]
|
||||
}
|
||||
| map {id, state ->
|
||||
println("Creating transfer_complete.txt file.")
|
||||
def complete_file = file("${params.publish_dir}/${state.prefix}/transfer_completed.txt")
|
||||
complete_file.text = "" // This will create a file when it does not exist.
|
||||
[id, state]
|
||||
}
|
||||
| setState([
|
||||
"fastq_output",
|
||||
"multiqc_output",
|
||||
"sample_qc_output",
|
||||
"demultiplexer_logs"
|
||||
])
|
||||
|
||||
|
||||
emit:
|
||||
await_ch
|
||||
}
|
||||
|
||||
def get_version(input) {
|
||||
def inputFile = file(input)
|
||||
if (!inputFile.exists()) {
|
||||
// When executing tests
|
||||
return "unknown_version"
|
||||
}
|
||||
def yamlSlurper = new groovy.yaml.YamlSlurper()
|
||||
def loaded_viash_config = yamlSlurper.parse(inputFile)
|
||||
def version = (loaded_viash_config.version) ? loaded_viash_config.version : "unknown_version"
|
||||
println("Version of demultiplex to be used: ${version}")
|
||||
return version
|
||||
}
|
||||
20
src/runner/nextflow.config
Normal file
20
src/runner/nextflow.config
Normal file
@@ -0,0 +1,20 @@
|
||||
manifest {
|
||||
nextflowVersion = '!>=20.12.1-edge'
|
||||
}
|
||||
|
||||
process {
|
||||
withName: publishStatesProc {
|
||||
publishDir = [ enabled: false ]
|
||||
}
|
||||
|
||||
withName: publishFilesProc {
|
||||
publishDir = [ enabled: false ]
|
||||
}
|
||||
}
|
||||
|
||||
params {
|
||||
rootDir = java.nio.file.Paths.get("$projectDir/../../").toAbsolutePath().normalize().toString()
|
||||
}
|
||||
|
||||
// include common settings
|
||||
includeConfig("${params.rootDir}/src/config/labels.config")
|
||||
111
src/runner/test.nf
Normal file
111
src/runner/test.nf
Normal file
@@ -0,0 +1,111 @@
|
||||
import java.nio.file.Files
|
||||
import nextflow.exception.WorkflowScriptErrorException
|
||||
|
||||
// Create temporary directory for the publish_dir if it is not defined
|
||||
if (!params.publish_dir && params.publishDir) {
|
||||
params.publish_dir = params.publishDir
|
||||
}
|
||||
|
||||
if (!params.publish_dir) {
|
||||
def tempDir = Files.createTempDirectory("demultiplex_runner_integration_test")
|
||||
println "Created temp directory: $tempDir"
|
||||
// Register shutdown hook to delete it on JVM exit
|
||||
Runtime.runtime.addShutdownHook(new Thread({
|
||||
try {
|
||||
// Delete directory recursively
|
||||
Files.walk(tempDir)
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.forEach { Files.delete(it) }
|
||||
println "Deleted temp directory: $tempDir"
|
||||
} catch (Exception e) {
|
||||
println "Failed to delete temp directory: $e"
|
||||
}
|
||||
}))
|
||||
params.publish_dir = tempDir
|
||||
}
|
||||
// The module inherits the parameters defined before the include statement,
|
||||
// therefore any parameters set afterwards will not be used by the module.
|
||||
|
||||
include { runner } from params.rootDir + "/target/nextflow/runner/main.nf"
|
||||
params.resources_test = params.rootDir + "/testData/"
|
||||
|
||||
workflow test {
|
||||
output_ch = Channel.fromList([
|
||||
[
|
||||
id: "test",
|
||||
input: params.resources_test + "200624_A00834_0183_BHMTFYDRXX.tar.gz",
|
||||
]
|
||||
])
|
||||
| map {event -> [event.id, event] }
|
||||
| runner.run(
|
||||
fromState: {id, state -> state }
|
||||
)
|
||||
|
||||
all_events_ch = output_ch
|
||||
| toSortedList()
|
||||
| map{states ->
|
||||
assert states.size() == 1
|
||||
}
|
||||
|
||||
output_ch
|
||||
| map {id, state ->
|
||||
assert id == "test"
|
||||
assert state.fastq_output.isDirectory()
|
||||
assert state.sample_qc_output.isDirectory()
|
||||
assert state.multiqc_output.isFile()
|
||||
assert state.demultiplexer_logs.isDirectory()
|
||||
}
|
||||
|
||||
workflow.onComplete = {
|
||||
try {
|
||||
// Nexflow only allows exceptions generated using the 'error' function (which throws WorkflowScriptErrorException).
|
||||
// So in order for the assert statement to work (or allow other errors to let the tests to fail)
|
||||
// We need to wrap these in WorkflowScriptErrorException. See https://github.com/nextflow-io/nextflow/pull/4458/files
|
||||
// The error message will show up in .nextflow.log
|
||||
def publish_subdir = file("${params.publish_dir}/test")
|
||||
assert publish_subdir.isDirectory()
|
||||
def all_files = publish_subdir.listFiles()
|
||||
assert all_files.size() == 1
|
||||
def publish_dir = file(all_files[0])
|
||||
// version can be unknown_version (local tests) or actual version configured in _viash.yaml
|
||||
// with the new approach to fetching the version from _viash.yaml, this will be the branch name during CI builds
|
||||
// Disabling this test temporarily and creating an issue for it
|
||||
// assert publish_dir.name.endsWith("_demultiplex_unknown_version")
|
||||
def published_items = publish_dir.listFiles()
|
||||
assert published_items.size() == 5
|
||||
assert published_items.collect{it.name}.toSet() == ["demultiplexer_logs", "fastq", "qc", "SampleSheet.csv", "transfer_completed.txt"].toSet()
|
||||
def fastqc_files = publish_dir.resolve("qc/fastqc").listFiles()
|
||||
assert fastqc_files.collect{it.name}.toSet() == [
|
||||
"Sample1_S1_L001_R1_001_fastqc_data.txt",
|
||||
"Sample1_S1_L001_R1_001_fastqc_report.html",
|
||||
"Sample1_S1_L001_R1_001_summary.txt",
|
||||
"Sample23_S3_L001_R1_001_fastqc_data.txt",
|
||||
"Sample23_S3_L001_R1_001_fastqc_report.html",
|
||||
"Sample23_S3_L001_R1_001_summary.txt",
|
||||
"SampleA_S2_L001_R1_001_fastqc_data.txt",
|
||||
"SampleA_S2_L001_R1_001_fastqc_report.html",
|
||||
"SampleA_S2_L001_R1_001_summary.txt",
|
||||
"sampletest_S4_L001_R1_001_fastqc_data.txt",
|
||||
"sampletest_S4_L001_R1_001_fastqc_report.html",
|
||||
"sampletest_S4_L001_R1_001_summary.txt",
|
||||
"Undetermined_S0_L001_R1_001_fastqc_data.txt",
|
||||
"Undetermined_S0_L001_R1_001_fastqc_report.html",
|
||||
"Undetermined_S0_L001_R1_001_summary.txt"
|
||||
].toSet()
|
||||
assert publish_dir.resolve("qc/multiqc_report.html").exists()
|
||||
def fastq_files = publish_dir.resolve("fastq").listFiles()
|
||||
assert fastq_files.collect{it.name}.toSet() == [
|
||||
"Sample1_S1_L001_R1_001.fastq.gz",
|
||||
"Sample23_S3_L001_R1_001.fastq.gz",
|
||||
"SampleA_S2_L001_R1_001.fastq.gz",
|
||||
"sampletest_S4_L001_R1_001.fastq.gz",
|
||||
"Undetermined_S0_L001_R1_001.fastq.gz"
|
||||
].toSet()
|
||||
assert publish_dir.resolve("SampleSheet.csv").exists()
|
||||
} catch (Exception e) {
|
||||
throw new WorkflowScriptErrorException("Integration test failed!", e)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user