Nextflow Workflow

Nextflow Workflow


This is part 7 of 14 of a Introduction to NextFlow.


mkdir workflow
cd workflow

Our previous episodes have shown us how to parameterize workflows using params, move data around a workflow using channels, and define individual tasks using processes. In this episode, we will cover how to connect multiple processes to create a workflow.

drawing

Workflow definition

Implicit workflow - A workflow definition that does not declare any name is assumed to be the main workflow, and it is implicitly executed. Therefore, it’s the entry point of the workflow application.

Create a new workflow.nf; add the following code block and run nextflow run workflow.nf:

//workflow.nf
nextflow.enable.dsl=2

// Initialize required parameters
params.outdir = 'results'
params.genome = "/workspace/nextflow_tutorial/data/ref_genome/ecoli_rel606.fasta"
params.reads = "/workspace/nextflow_tutorial/data/trimmed_fastq/SRR2584863_{1,2}.trim.fastq.gz"

workflow {

    // Create channel from path for Reference Genome
    ref_ch = Channel.fromPath( params.genome, checkIfExists: true )
    // Create channel from file-pairs for Input Fastq files
    reads_ch = Channel.fromFilePairs( params.reads, checkIfExists: true )

    //index process takes 1 input channel as an argument
    BWA_INDEX( ref_ch )

    //bwa align process takes 2 input channels as arguments
    BWA_ALIGN( BWA_INDEX.out, reads_ch )
}

process BWA_INDEX {
  tag {"BWA_INDEX ${genome}"}
  label 'process_low'

  publishDir "${params.outdir}/bwa_index", mode: 'copy'

  input:
  path( genome )

  output:
  tuple path( genome ), path( "*" )

  script:
  """
  bwa index ${genome}
  """
}

/*
 * Align reads to reference genome & create BAM file.
 */
process BWA_ALIGN {
    tag {"BWA_ALIGN ${sample_id}"}
    label 'process_medium'

    publishDir "${params.outdir}/bwa_align", mode: 'copy'

    input:
    tuple path( genome ), path( "*" )
    tuple val( sample_id ), path( reads )

    output:
    tuple val( sample_id ), path( "${sample_id}.aligned.bam" )

    script:
    """
    INDEX=`find -L ./ -name "*.amb" | sed 's/.amb//'`
    bwa mem \$INDEX ${reads} > ${sample_id}.aligned.sam
    samtools view -S -b ${sample_id}.aligned.sam > ${sample_id}.aligned.bam
    """
}

Process Named Output

Modify workflow.nf as shown below and run nextflow run workflow.nf:

//workflow.nf
nextflow.enable.dsl=2

// Initialize required parameters
params.outdir = 'results'
params.genome = "/workspace/nextflow_tutorial/data/ref_genome/ecoli_rel606.fasta"
params.reads = "/workspace/nextflow_tutorial/data/trimmed_fastq/SRR2584863_{1,2}.trim.fastq.gz"

workflow {

    // Create channel from path for Reference Genome
    ref_ch = Channel.fromPath( params.genome, checkIfExists: true )
    // Create channel from file-pairs for Input Fastq files
    reads_ch = Channel.fromFilePairs( params.reads, checkIfExists: true )

    //index process takes 1 input channel as an argument
    BWA_INDEX( ref_ch )

    //bwa align process takes 2 input channels as arguments
    BWA_ALIGN( BWA_INDEX.out.bwa_index, reads_ch )
}

process BWA_INDEX {
  tag {"BWA_INDEX ${genome}"}
  label 'process_low'

  publishDir "${params.outdir}/bwa_index", mode: 'copy'

  input:
  path( genome )

  output:
  tuple path( genome ), path( "*" ), emit: bwa_index

  script:
  """
  bwa index ${genome}
  """
}

/*
 * Align reads to reference genome & create BAM file.
 */
process BWA_ALIGN {
    tag {"BWA_ALIGN ${sample_id}"}
    label 'process_medium'

    publishDir "${params.outdir}/bwa_align", mode: 'copy'

    input:
    tuple path( genome ), path( "*" )
    tuple val( sample_id ), path( reads )

    output:
    tuple val( sample_id ), path( "${sample_id}.aligned.bam" ), emit: aligned_bam

    script:
    """
    INDEX=`find -L ./ -name "*.amb" | sed 's/.amb//'`
    bwa mem \$INDEX ${reads} > ${sample_id}.aligned.sam
    samtools view -S -b ${sample_id}.aligned.sam > ${sample_id}.aligned.bam
    """
}

Quick Recap

  • A Nextflow workflow is defined by invoking processes inside the workflow scope.
  • A process is invoked like a function inside the workflow scope, passing any required input parameters as arguments. e.g., BWA_INDEX( ref_ch ).
  • Process outputs can be accessed using the out attribute for the respective process.
  • Multiple outputs from a single process can be accessed using the [] or output name.

Back to:Nextflow Processes Next:NextFlow Operators