diff --git a/src/runner/main.nf b/src/runner/main.nf index a97a5bd..5b77f7c 100644 --- a/src/runner/main.nf +++ b/src/runner/main.nf @@ -1,8 +1,14 @@ -def date = new Date().format('yyyyMMdd_hhmmss') +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean +def date = new Date().format('yyyyMMdd_hhmmss') def viash_config = java.nio.file.Paths.get("${moduleDir}/_viash.yaml") def version = get_version(viash_config) +session = nextflow.Nextflow.getSession() +final service = session.publishDirExecutorService() + + workflow run_wf { take: input_ch @@ -51,13 +57,16 @@ workflow run_wf { state + result + [ to_return: result ] }, ) - | publish.run( - fromState: { id, state -> - println(state.plain_output) + | map {id, state -> def id1 = (state.plain_output) ? id : "${state.run_id}/${date}" def id2 = (state.plain_output) ? id : "${id1}_demultiplex_${version}" - def prefix = (id2 == "run") ? "" : "${id2}/" + def new_state = state + ["prefix": prefix] + [id, new_state] + } + | publish.run( + fromState: { id, state -> + def prefix = state.prefix // These output names are determined by arguments. def fastq_output_1 = "${prefix}${state.fastq_output_workflow}" def sample_qc_output_1 = "${prefix}${state.sample_qc_output_workflow}" @@ -67,18 +76,6 @@ workflow run_wf { def run_information_output_1 = "${prefix}${state.output_run_information.getName()}" println("Publising to ${params.publish_dir}/${prefix}") - - // Create a file to indicate that the publishing (transfer) of files has been completed. - // Multiple items can be added to onCompleteActions; which is required when processing multiple sequencing runs at a time. - // Alternatively setOnComplete could be used to add actions, but that only adds them at the end of the list (which is executed in order). - // The 'completed.txt' file must be created before the onComplete of the integration tests are run, so we need to prepend to the list. - workflow.onCompleteActions.add(0, { - if (workflow.exitStatus == 0) { - def complete_file = file("${params.publish_dir}/${prefix}/transfer_completed.txt") - complete_file.text = "" // This will create a file when it does not exist. - } - }) - [ input: state.output, input_sample_qc: state.output_sample_qc, @@ -92,7 +89,7 @@ workflow run_wf { output_demultiplexer_logs: demultiplexer_logs_output, ] }, - toState: { id, result, state -> [ fastq_output: state.to_return.output ] }, + toState: { id, result, state -> [ "fastq_output": state.to_return.output, "prefix": state.prefix ] }, directives: [ publishDir: [ path: "${params.publish_dir}", @@ -102,8 +99,60 @@ workflow run_wf { ] ) +has_published = new AtomicBoolean(false) + +interval_ch = channel.interval('10s'){ i -> + // Allow this channel to stop generating events based on a later signal + if (has_published.get()) { + return channel.STOP + } + i +} + +await_ch = output_ch + // Wait for demultiplexing processes to be done + | toSortedList() + // Create periodic events in order to check for the publishing to be done + | combine(interval_ch) + | until { event -> + println("Checking if publishing has finished in service ${service}") + def running_tasks = null + if(service instanceof ThreadPoolExecutor) { + def completed_tasks = service.getCompletedTaskCount() + def task_count = service.getTaskCount() + running_tasks = completed_tasks - task_count + } + else if( System.getenv('NXF_ENABLE_VIRTUAL_THREADS') ) { + running_tasks = service.threadCount() + } + else { + error("Publishing service of class ${service.getClass()} is not supported.") + } + + if (running_tasks == 0) { + println("Publishing has finished all current tasks. Continuing execution.") + return true + } + println("Workflow is publishing. Waiting...") + return false + } + | last() + | map{ event -> + // Signal to interval channel to stop generating events. + has_published.compareAndSet(false, true) + return event[0] + } + | map {id, state -> + println("Creating transfer_complete.txt file.") + def complete_file = file("${params.publish_dir}/${state.prefix}/transfer_completed.txt") + complete_file.text = "" // This will create a file when it does not exist. + [id, state] + } + | setState(["fastq_output"]) + + emit: - output_ch + await_ch } def get_version(input) { diff --git a/target/executable/io/interop_summary_to_csv/.config.vsh.yaml b/target/executable/io/interop_summary_to_csv/.config.vsh.yaml index a89f08f..c44de88 100644 --- a/target/executable/io/interop_summary_to_csv/.config.vsh.yaml +++ b/target/executable/io/interop_summary_to_csv/.config.vsh.yaml @@ -160,9 +160,9 @@ build_info: output: "target/executable/io/interop_summary_to_csv" executable: "target/executable/io/interop_summary_to_csv/interop_summary_to_csv" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/executable/io/interop_summary_to_csv/interop_summary_to_csv b/target/executable/io/interop_summary_to_csv/interop_summary_to_csv index 549d820..1fb1712 100755 --- a/target/executable/io/interop_summary_to_csv/interop_summary_to_csv +++ b/target/executable/io/interop_summary_to_csv/interop_summary_to_csv @@ -454,9 +454,9 @@ tar -C /tmp/ --no-same-owner --no-same-permissions -xvf /tmp/interop.tar.gz && \ mv /tmp/interop-1.3.1-Linux-GNU/bin/index-summary /tmp/interop-1.3.1-Linux-GNU/bin/summary /usr/local/bin/ LABEL org.opencontainers.image.description="Companion container for running component io interop_summary_to_csv" -LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z" +LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex" -LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" +LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" LABEL org.opencontainers.image.version="main" VIASHDOCKER diff --git a/target/executable/io/publish/.config.vsh.yaml b/target/executable/io/publish/.config.vsh.yaml index cb85b92..c64bda3 100644 --- a/target/executable/io/publish/.config.vsh.yaml +++ b/target/executable/io/publish/.config.vsh.yaml @@ -222,9 +222,9 @@ build_info: output: "target/executable/io/publish" executable: "target/executable/io/publish/publish" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/executable/io/publish/publish b/target/executable/io/publish/publish index 8bee7da..84095ce 100755 --- a/target/executable/io/publish/publish +++ b/target/executable/io/publish/publish @@ -450,9 +450,9 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* LABEL org.opencontainers.image.description="Companion container for running component io publish" -LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z" +LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex" -LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" +LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" LABEL org.opencontainers.image.version="main" VIASHDOCKER diff --git a/target/executable/io/untar/.config.vsh.yaml b/target/executable/io/untar/.config.vsh.yaml index cabf8c0..1efe2b5 100644 --- a/target/executable/io/untar/.config.vsh.yaml +++ b/target/executable/io/untar/.config.vsh.yaml @@ -159,9 +159,9 @@ build_info: output: "target/executable/io/untar" executable: "target/executable/io/untar/untar" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/executable/io/untar/untar b/target/executable/io/untar/untar index bab2a13..fd96d8f 100755 --- a/target/executable/io/untar/untar +++ b/target/executable/io/untar/untar @@ -450,9 +450,9 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* LABEL org.opencontainers.image.description="Companion container for running component io untar" -LABEL org.opencontainers.image.created="2025-08-05T10:55:23Z" +LABEL org.opencontainers.image.created="2025-08-08T14:01:33Z" LABEL org.opencontainers.image.source="https://github.com/viash-hub/demultiplex" -LABEL org.opencontainers.image.revision="0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" +LABEL org.opencontainers.image.revision="45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" LABEL org.opencontainers.image.version="main" VIASHDOCKER diff --git a/target/nextflow/dataflow/combine_samples/.config.vsh.yaml b/target/nextflow/dataflow/combine_samples/.config.vsh.yaml index 5dd7843..4eb2a2c 100644 --- a/target/nextflow/dataflow/combine_samples/.config.vsh.yaml +++ b/target/nextflow/dataflow/combine_samples/.config.vsh.yaml @@ -168,9 +168,9 @@ build_info: output: "target/nextflow/dataflow/combine_samples" executable: "target/nextflow/dataflow/combine_samples/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/dataflow/combine_samples/main.nf b/target/nextflow/dataflow/combine_samples/main.nf index 60c6c94..9aa4f8a 100644 --- a/target/nextflow/dataflow/combine_samples/main.nf +++ b/target/nextflow/dataflow/combine_samples/main.nf @@ -3235,9 +3235,9 @@ meta = [ "engine" : "native|native", "output" : "target/nextflow/dataflow/combine_samples", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/dataflow/gather_fastqs_and_validate/.config.vsh.yaml b/target/nextflow/dataflow/gather_fastqs_and_validate/.config.vsh.yaml index cb6dae6..8bba4d4 100644 --- a/target/nextflow/dataflow/gather_fastqs_and_validate/.config.vsh.yaml +++ b/target/nextflow/dataflow/gather_fastqs_and_validate/.config.vsh.yaml @@ -159,9 +159,9 @@ build_info: output: "target/nextflow/dataflow/gather_fastqs_and_validate" executable: "target/nextflow/dataflow/gather_fastqs_and_validate/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/dataflow/gather_fastqs_and_validate/main.nf b/target/nextflow/dataflow/gather_fastqs_and_validate/main.nf index e221cbf..3a86f80 100644 --- a/target/nextflow/dataflow/gather_fastqs_and_validate/main.nf +++ b/target/nextflow/dataflow/gather_fastqs_and_validate/main.nf @@ -3232,9 +3232,9 @@ meta = [ "engine" : "native|native", "output" : "target/nextflow/dataflow/gather_fastqs_and_validate", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/demultiplex/.config.vsh.yaml b/target/nextflow/demultiplex/.config.vsh.yaml index 4085773..2b98e93 100644 --- a/target/nextflow/demultiplex/.config.vsh.yaml +++ b/target/nextflow/demultiplex/.config.vsh.yaml @@ -268,9 +268,9 @@ build_info: output: "target/nextflow/demultiplex" executable: "target/nextflow/demultiplex/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" dependencies: - "target/nextflow/io/untar" - "target/nextflow/dataflow/gather_fastqs_and_validate" diff --git a/target/nextflow/demultiplex/main.nf b/target/nextflow/demultiplex/main.nf index 1280069..86a995c 100644 --- a/target/nextflow/demultiplex/main.nf +++ b/target/nextflow/demultiplex/main.nf @@ -3380,9 +3380,9 @@ meta = [ "engine" : "native|native", "output" : "target/nextflow/demultiplex", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/detect_demultiplexer/.config.vsh.yaml b/target/nextflow/detect_demultiplexer/.config.vsh.yaml index 12941ab..d6590c1 100644 --- a/target/nextflow/detect_demultiplexer/.config.vsh.yaml +++ b/target/nextflow/detect_demultiplexer/.config.vsh.yaml @@ -168,9 +168,9 @@ build_info: output: "target/nextflow/detect_demultiplexer" executable: "target/nextflow/detect_demultiplexer/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/detect_demultiplexer/main.nf b/target/nextflow/detect_demultiplexer/main.nf index c8f7fa8..de8844e 100644 --- a/target/nextflow/detect_demultiplexer/main.nf +++ b/target/nextflow/detect_demultiplexer/main.nf @@ -3224,9 +3224,9 @@ meta = [ "engine" : "native|native", "output" : "target/nextflow/detect_demultiplexer", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/io/interop_summary_to_csv/.config.vsh.yaml b/target/nextflow/io/interop_summary_to_csv/.config.vsh.yaml index d32529d..cb3093c 100644 --- a/target/nextflow/io/interop_summary_to_csv/.config.vsh.yaml +++ b/target/nextflow/io/interop_summary_to_csv/.config.vsh.yaml @@ -160,9 +160,9 @@ build_info: output: "target/nextflow/io/interop_summary_to_csv" executable: "target/nextflow/io/interop_summary_to_csv/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/io/interop_summary_to_csv/main.nf b/target/nextflow/io/interop_summary_to_csv/main.nf index 2a8793d..b300bf0 100644 --- a/target/nextflow/io/interop_summary_to_csv/main.nf +++ b/target/nextflow/io/interop_summary_to_csv/main.nf @@ -3233,9 +3233,9 @@ meta = [ "engine" : "docker|native", "output" : "target/nextflow/io/interop_summary_to_csv", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/io/publish/.config.vsh.yaml b/target/nextflow/io/publish/.config.vsh.yaml index 733ba84..15a0ec3 100644 --- a/target/nextflow/io/publish/.config.vsh.yaml +++ b/target/nextflow/io/publish/.config.vsh.yaml @@ -222,9 +222,9 @@ build_info: output: "target/nextflow/io/publish" executable: "target/nextflow/io/publish/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/io/publish/main.nf b/target/nextflow/io/publish/main.nf index f3e07dc..05290ec 100644 --- a/target/nextflow/io/publish/main.nf +++ b/target/nextflow/io/publish/main.nf @@ -3302,9 +3302,9 @@ meta = [ "engine" : "docker|native", "output" : "target/nextflow/io/publish", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/io/untar/.config.vsh.yaml b/target/nextflow/io/untar/.config.vsh.yaml index f404e88..774b511 100644 --- a/target/nextflow/io/untar/.config.vsh.yaml +++ b/target/nextflow/io/untar/.config.vsh.yaml @@ -159,9 +159,9 @@ build_info: output: "target/nextflow/io/untar" executable: "target/nextflow/io/untar/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" package_config: name: "demultiplex" version: "main" diff --git a/target/nextflow/io/untar/main.nf b/target/nextflow/io/untar/main.nf index 8487fa9..166c271 100644 --- a/target/nextflow/io/untar/main.nf +++ b/target/nextflow/io/untar/main.nf @@ -3232,9 +3232,9 @@ meta = [ "engine" : "docker|native", "output" : "target/nextflow/io/untar", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", diff --git a/target/nextflow/runner/.config.vsh.yaml b/target/nextflow/runner/.config.vsh.yaml index 2fb2360..117dcc9 100644 --- a/target/nextflow/runner/.config.vsh.yaml +++ b/target/nextflow/runner/.config.vsh.yaml @@ -214,9 +214,9 @@ build_info: output: "target/nextflow/runner" executable: "target/nextflow/runner/main.nf" viash_version: "0.9.4" - git_commit: "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805" + git_commit: "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae" git_remote: "https://github.com/viash-hub/demultiplex" - git_tag: "v0.1.1-37-g0d4c458" + git_tag: "v0.1.1-38-g45cd694" dependencies: - "target/nextflow/demultiplex" - "target/nextflow/io/publish" diff --git a/target/nextflow/runner/main.nf b/target/nextflow/runner/main.nf index cd372f4..413186a 100644 --- a/target/nextflow/runner/main.nf +++ b/target/nextflow/runner/main.nf @@ -3296,9 +3296,9 @@ meta = [ "engine" : "native|native", "output" : "target/nextflow/runner", "viash_version" : "0.9.4", - "git_commit" : "0d4c4584f0d18ab18c21e7d71c43b8deed2c4805", + "git_commit" : "45cd6947ef96ae0dd6c38c0e9a791797d68bf4ae", "git_remote" : "https://github.com/viash-hub/demultiplex", - "git_tag" : "v0.1.1-37-g0d4c458" + "git_tag" : "v0.1.1-38-g45cd694" }, "package_config" : { "name" : "demultiplex", @@ -3344,11 +3344,17 @@ include { publish } from "${meta.resources_dir}/../../nextflow/io/publish/main.n // inner workflow // user-provided Nextflow code -def date = new Date().format('yyyyMMdd_hhmmss') +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean +def date = new Date().format('yyyyMMdd_hhmmss') def viash_config = java.nio.file.Paths.get("${moduleDir}/_viash.yaml") def version = get_version(viash_config) +session = nextflow.Nextflow.getSession() +final service = session.publishDirExecutorService() + + workflow run_wf { take: input_ch @@ -3397,13 +3403,16 @@ workflow run_wf { state + result + [ to_return: result ] }, ) - | publish.run( - fromState: { id, state -> - println(state.plain_output) + | map {id, state -> def id1 = (state.plain_output) ? id : "${state.run_id}/${date}" def id2 = (state.plain_output) ? id : "${id1}_demultiplex_${version}" - def prefix = (id2 == "run") ? "" : "${id2}/" + def new_state = state + ["prefix": prefix] + [id, new_state] + } + | publish.run( + fromState: { id, state -> + def prefix = state.prefix // These output names are determined by arguments. def fastq_output_1 = "${prefix}${state.fastq_output_workflow}" def sample_qc_output_1 = "${prefix}${state.sample_qc_output_workflow}" @@ -3413,18 +3422,6 @@ workflow run_wf { def run_information_output_1 = "${prefix}${state.output_run_information.getName()}" println("Publising to ${params.publish_dir}/${prefix}") - - // Create a file to indicate that the publishing (transfer) of files has been completed. - // Multiple items can be added to onCompleteActions; which is required when processing multiple sequencing runs at a time. - // Alternatively setOnComplete could be used to add actions, but that only adds them at the end of the list (which is executed in order). - // The 'completed.txt' file must be created before the onComplete of the integration tests are run, so we need to prepend to the list. - workflow.onCompleteActions.add(0, { - if (workflow.exitStatus == 0) { - def complete_file = file("${params.publish_dir}/${prefix}/transfer_completed.txt") - complete_file.text = "" // This will create a file when it does not exist. - } - }) - [ input: state.output, input_sample_qc: state.output_sample_qc, @@ -3438,7 +3435,7 @@ workflow run_wf { output_demultiplexer_logs: demultiplexer_logs_output, ] }, - toState: { id, result, state -> [ fastq_output: state.to_return.output ] }, + toState: { id, result, state -> [ "fastq_output": state.to_return.output, "prefix": state.prefix ] }, directives: [ publishDir: [ path: "${params.publish_dir}", @@ -3448,8 +3445,60 @@ workflow run_wf { ] ) +has_published = new AtomicBoolean(false) + +interval_ch = channel.interval('10s'){ i -> + // Allow this channel to stop generating events based on a later signal + if (has_published.get()) { + return channel.STOP + } + i +} + +await_ch = output_ch + // Wait for demultiplexing processes to be done + | toSortedList() + // Create periodic events in order to check for the publishing to be done + | combine(interval_ch) + | until { event -> + println("Checking if publishing has finished in service ${service}") + def running_tasks = null + if(service instanceof ThreadPoolExecutor) { + def completed_tasks = service.getCompletedTaskCount() + def task_count = service.getTaskCount() + running_tasks = completed_tasks - task_count + } + else if( System.getenv('NXF_ENABLE_VIRTUAL_THREADS') ) { + running_tasks = service.threadCount() + } + else { + error("Publishing service of class ${service.getClass()} is not supported.") + } + + if (running_tasks == 0) { + println("Publishing has finished all current tasks. Continuing execution.") + return true + } + println("Workflow is publishing. Waiting...") + return false + } + | last() + | map{ event -> + // Signal to interval channel to stop generating events. + has_published.compareAndSet(false, true) + return event[0] + } + | map {id, state -> + println("Creating transfer_complete.txt file.") + def complete_file = file("${params.publish_dir}/${state.prefix}/transfer_completed.txt") + complete_file.text = "" // This will create a file when it does not exist. + [id, state] + } + | setState(["fastq_output"]) + + emit: - output_ch + await_ch } def get_version(input) {