Build branch main with version main (45cd694)

Build pipeline: viash-hub.demultiplex.main-zdtf2

Source commit: 45cd6947ef

Source message: Allow checking for end of publishing when used as a subworkflow (#59)
This commit is contained in:
CI
2025-08-08 14:28:46 +00:00
parent 214329783f
commit 89d8d58ba9
23 changed files with 180 additions and 82 deletions

View File

@@ -1,8 +1,14 @@
def date = new Date().format('yyyyMMdd_hhmmss')
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
def date = new Date().format('yyyyMMdd_hhmmss')
def viash_config = java.nio.file.Paths.get("${moduleDir}/_viash.yaml")
def version = get_version(viash_config)
session = nextflow.Nextflow.getSession()
final service = session.publishDirExecutorService()
workflow run_wf {
take:
input_ch
@@ -51,13 +57,16 @@ workflow run_wf {
state + result + [ to_return: result ]
},
)
| publish.run(
fromState: { id, state ->
println(state.plain_output)
| map {id, state ->
def id1 = (state.plain_output) ? id : "${state.run_id}/${date}"
def id2 = (state.plain_output) ? id : "${id1}_demultiplex_${version}"
def prefix = (id2 == "run") ? "" : "${id2}/"
def new_state = state + ["prefix": prefix]
[id, new_state]
}
| publish.run(
fromState: { id, state ->
def prefix = state.prefix
// These output names are determined by arguments.
def fastq_output_1 = "${prefix}${state.fastq_output_workflow}"
def sample_qc_output_1 = "${prefix}${state.sample_qc_output_workflow}"
@@ -67,18 +76,6 @@ workflow run_wf {
def run_information_output_1 = "${prefix}${state.output_run_information.getName()}"
println("Publising to ${params.publish_dir}/${prefix}")
// Create a file to indicate that the publishing (transfer) of files has been completed.
// Multiple items can be added to onCompleteActions; which is required when processing multiple sequencing runs at a time.
// Alternatively setOnComplete could be used to add actions, but that only adds them at the end of the list (which is executed in order).
// The 'completed.txt' file must be created before the onComplete of the integration tests are run, so we need to prepend to the list.
workflow.onCompleteActions.add(0, {
if (workflow.exitStatus == 0) {
def complete_file = file("${params.publish_dir}/${prefix}/transfer_completed.txt")
complete_file.text = "" // This will create a file when it does not exist.
}
})
[
input: state.output,
input_sample_qc: state.output_sample_qc,
@@ -92,7 +89,7 @@ workflow run_wf {
output_demultiplexer_logs: demultiplexer_logs_output,
]
},
toState: { id, result, state -> [ fastq_output: state.to_return.output ] },
toState: { id, result, state -> [ "fastq_output": state.to_return.output, "prefix": state.prefix ] },
directives: [
publishDir: [
path: "${params.publish_dir}",
@@ -102,8 +99,60 @@ workflow run_wf {
]
)
has_published = new AtomicBoolean(false)
interval_ch = channel.interval('10s'){ i ->
// Allow this channel to stop generating events based on a later signal
if (has_published.get()) {
return channel.STOP
}
i
}
await_ch = output_ch
// Wait for demultiplexing processes to be done
| toSortedList()
// Create periodic events in order to check for the publishing to be done
| combine(interval_ch)
| until { event ->
println("Checking if publishing has finished in service ${service}")
def running_tasks = null
if(service instanceof ThreadPoolExecutor) {
def completed_tasks = service.getCompletedTaskCount()
def task_count = service.getTaskCount()
running_tasks = completed_tasks - task_count
}
else if( System.getenv('NXF_ENABLE_VIRTUAL_THREADS') ) {
running_tasks = service.threadCount()
}
else {
error("Publishing service of class ${service.getClass()} is not supported.")
}
if (running_tasks == 0) {
println("Publishing has finished all current tasks. Continuing execution.")
return true
}
println("Workflow is publishing. Waiting...")
return false
}
| last()
| map{ event ->
// Signal to interval channel to stop generating events.
has_published.compareAndSet(false, true)
return event[0]
}
| map {id, state ->
println("Creating transfer_complete.txt file.")
def complete_file = file("${params.publish_dir}/${state.prefix}/transfer_completed.txt")
complete_file.text = "" // This will create a file when it does not exist.
[id, state]
}
| setState(["fastq_output"])
emit:
output_ch
await_ch
}
def get_version(input) {

View File

@@ -160,9 +160,9 @@ build_info:
output: "target/executable/io/interop_summary_to_csv"
executable: "target/executable/io/interop_summary_to_csv/interop_summary_to_csv"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -454,9 +454,9 @@ tar -C /tmp/ --no-same-owner --no-same-permissions -xvf /tmp/interop.tar.gz && \
mv /tmp/interop-1.3.1-Linux-GNU/bin/index-summary /tmp/interop-1.3.1-Linux-GNU/bin/summary /usr/local/bin/
LABEL org.opencontainers.image.description="Companion container for running component io interop_summary_to_csv"
LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z"
LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z"
LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex"
LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
LABEL org.opencontainers.image.version="main"
VIASHDOCKER

View File

@@ -222,9 +222,9 @@ build_info:
output: "target/executable/io/publish"
executable: "target/executable/io/publish/publish"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -450,9 +450,9 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*
LABEL org.opencontainers.image.description="Companion container for running component io publish"
LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z"
LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z"
LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex"
LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
LABEL org.opencontainers.image.version="main"
VIASHDOCKER

View File

@@ -159,9 +159,9 @@ build_info:
output: "target/executable/io/untar"
executable: "target/executable/io/untar/untar"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -450,9 +450,9 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/*
LABEL org.opencontainers.image.description="Companion container for running component io untar"
LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z"
LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z"
LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex"
LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
LABEL org.opencontainers.image.version="main"
VIASHDOCKER

View File

@@ -168,9 +168,9 @@ build_info:
output: "target/nextflow/dataflow/combine_samples"
executable: "target/nextflow/dataflow/combine_samples/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3235,9 +3235,9 @@ meta = [
"engine" : "native|native",
"output" : "target/nextflow/dataflow/combine_samples",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -159,9 +159,9 @@ build_info:
output: "target/nextflow/dataflow/gather_fastqs_and_validate"
executable: "target/nextflow/dataflow/gather_fastqs_and_validate/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3232,9 +3232,9 @@ meta = [
"engine" : "native|native",
"output" : "target/nextflow/dataflow/gather_fastqs_and_validate",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -268,9 +268,9 @@ build_info:
output: "target/nextflow/demultiplex"
executable: "target/nextflow/demultiplex/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
dependencies:
- "target/nextflow/io/untar"
- "target/nextflow/dataflow/gather_fastqs_and_validate"

View File

@@ -3380,9 +3380,9 @@ meta = [
"engine" : "native|native",
"output" : "target/nextflow/demultiplex",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -168,9 +168,9 @@ build_info:
output: "target/nextflow/detect_demultiplexer"
executable: "target/nextflow/detect_demultiplexer/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3224,9 +3224,9 @@ meta = [
"engine" : "native|native",
"output" : "target/nextflow/detect_demultiplexer",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -160,9 +160,9 @@ build_info:
output: "target/nextflow/io/interop_summary_to_csv"
executable: "target/nextflow/io/interop_summary_to_csv/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3233,9 +3233,9 @@ meta = [
"engine" : "docker|native",
"output" : "target/nextflow/io/interop_summary_to_csv",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -222,9 +222,9 @@ build_info:
output: "target/nextflow/io/publish"
executable: "target/nextflow/io/publish/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3302,9 +3302,9 @@ meta = [
"engine" : "docker|native",
"output" : "target/nextflow/io/publish",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -159,9 +159,9 @@ build_info:
output: "target/nextflow/io/untar"
executable: "target/nextflow/io/untar/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
package_config:
name: "demultiplex"
version: "main"

View File

@@ -3232,9 +3232,9 @@ meta = [
"engine" : "docker|native",
"output" : "target/nextflow/io/untar",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",

View File

@@ -214,9 +214,9 @@ build_info:
output: "target/nextflow/runner"
executable: "target/nextflow/runner/main.nf"
viash_version: "0.9.4"
git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805"
git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae"
git_remote: "https://github.com/viash-hub/demultiplex"
git_tag: "v0.1.1-37-g0d4c458"
git_tag: "v0.1.1-38-g45cd694"
dependencies:
- "target/nextflow/demultiplex"
- "target/nextflow/io/publish"

View File

@@ -3296,9 +3296,9 @@ meta = [
"engine" : "native|native",
"output" : "target/nextflow/runner",
"viash_version" : "0.9.4",
"git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805",
"git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae",
"git_remote" : "https://github.com/viash-hub/demultiplex",
"git_tag" : "v0.1.1-37-g0d4c458"
"git_tag" : "v0.1.1-38-g45cd694"
},
"package_config" : {
"name" : "demultiplex",
@@ -3344,11 +3344,17 @@ include { publish } from "${meta.resources_dir}/../../nextflow/io/publish/main.n
// inner workflow
// user-provided Nextflow code
def date = new Date().format('yyyyMMdd_hhmmss')
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
def date = new Date().format('yyyyMMdd_hhmmss')
def viash_config = java.nio.file.Paths.get("${moduleDir}/_viash.yaml")
def version = get_version(viash_config)
session = nextflow.Nextflow.getSession()
final service = session.publishDirExecutorService()
workflow run_wf {
take:
input_ch
@@ -3397,13 +3403,16 @@ workflow run_wf {
state + result + [ to_return: result ]
},
)
| publish.run(
fromState: { id, state ->
println(state.plain_output)
| map {id, state ->
def id1 = (state.plain_output) ? id : "${state.run_id}/${date}"
def id2 = (state.plain_output) ? id : "${id1}_demultiplex_${version}"
def prefix = (id2 == "run") ? "" : "${id2}/"
def new_state = state + ["prefix": prefix]
[id, new_state]
}
| publish.run(
fromState: { id, state ->
def prefix = state.prefix
// These output names are determined by arguments.
def fastq_output_1 = "${prefix}${state.fastq_output_workflow}"
def sample_qc_output_1 = "${prefix}${state.sample_qc_output_workflow}"
@@ -3413,18 +3422,6 @@ workflow run_wf {
def run_information_output_1 = "${prefix}${state.output_run_information.getName()}"
println("Publising to ${params.publish_dir}/${prefix}")
// Create a file to indicate that the publishing (transfer) of files has been completed.
// Multiple items can be added to onCompleteActions; which is required when processing multiple sequencing runs at a time.
// Alternatively setOnComplete could be used to add actions, but that only adds them at the end of the list (which is executed in order).
// The 'completed.txt' file must be created before the onComplete of the integration tests are run, so we need to prepend to the list.
workflow.onCompleteActions.add(0, {
if (workflow.exitStatus == 0) {
def complete_file = file("${params.publish_dir}/${prefix}/transfer_completed.txt")
complete_file.text = "" // This will create a file when it does not exist.
}
})
[
input: state.output,
input_sample_qc: state.output_sample_qc,
@@ -3438,7 +3435,7 @@ workflow run_wf {
output_demultiplexer_logs: demultiplexer_logs_output,
]
},
toState: { id, result, state -> [ fastq_output: state.to_return.output ] },
toState: { id, result, state -> [ "fastq_output": state.to_return.output, "prefix": state.prefix ] },
directives: [
publishDir: [
path: "${params.publish_dir}",
@@ -3448,8 +3445,60 @@ workflow run_wf {
]
)
has_published = new AtomicBoolean(false)
interval_ch = channel.interval('10s'){ i ->
// Allow this channel to stop generating events based on a later signal
if (has_published.get()) {
return channel.STOP
}
i
}
await_ch = output_ch
// Wait for demultiplexing processes to be done
| toSortedList()
// Create periodic events in order to check for the publishing to be done
| combine(interval_ch)
| until { event ->
println("Checking if publishing has finished in service ${service}")
def running_tasks = null
if(service instanceof ThreadPoolExecutor) {
def completed_tasks = service.getCompletedTaskCount()
def task_count = service.getTaskCount()
running_tasks = completed_tasks - task_count
}
else if( System.getenv('NXF_ENABLE_VIRTUAL_THREADS') ) {
running_tasks = service.threadCount()
}
else {
error("Publishing service of class ${service.getClass()} is not supported.")
}
if (running_tasks == 0) {
println("Publishing has finished all current tasks. Continuing execution.")
return true
}
println("Workflow is publishing. Waiting...")
return false
}
| last()
| map{ event ->
// Signal to interval channel to stop generating events.
has_published.compareAndSet(false, true)
return event[0]
}
| map {id, state ->
println("Creating transfer_complete.txt file.")
def complete_file = file("${params.publish_dir}/${state.prefix}/transfer_completed.txt")
complete_file.text = "" // This will create a file when it does not exist.
[id, state]
}
| setState(["fastq_output"])
emit:
output_ch
await_ch
}
def get_version(input) {