Nextflow workshop

Author

Marcus Wenne

Published

Invalid Date

Goals of This Tutorial

  1. Gain a clear understanding of how Nextflow functions.
  2. Learn to create your own pipeline.
  3. Discover how to use Nextflow operators to simplify your coding.
  4. Explore the integration of containers with Nextflow.
  5. Learn the steps to launch a job on the Vera cluster.

Layout of This Tutorial

In this tutorial, you will primarily work with pre-written code. We will provide you with practical examples and helpful hints. However, you will be crafting much of the code on your own. Use the Nextflow documentation as your primary resource. Of course, feel free to ask as many questions as you need!

We’ll begin with a brief presentation on Nextflow’s mechanics, followed by hands-on coding sessions.

Processes and channels

Processes

In practice a Nextflow pipeline script is made by joining together different processes. Each process can be written in any scripting language that can be executed by the Linux platform (Bash, Perl, Ruby, Python, etc.).

Processes are executed independently and are isolated from each other, i.e. they do not share a common (writable) state. The only way they can communicate is via asynchronous FIFO queues, called channels in Nextflow. The fact that the flow of data is asynchronous means that the output files might not be in the same order as the input files. ie Nextflow does not care about the order of the files. Just that they flow between processes in the proper order.

Any process can define one or more channels as input and output. The interaction between these processes, and ultimately the pipeline execution flow itself, is implicitly defined by these input and output declarations.

Source

In Nextflow, you decide what kind of data each process handles. This includes defining inputs and outputs. Inputs and outputs can vary – they might be a simple variable, a file path, standard output, etc. This versatility is what makes channels in Nextflow so powerful. You can seamlessly integrate file paths with variables (like sample names), as you’ll discover in this tutorial.

The inputs and outputs for each process are defined within the Process itself. But how do you link the output of one process to the input of another? Let’s look at an example workflow where this connection is made.

In Nextflow, processes are defined within curly braces {}. Each process specifies its inputs, outputs, and the command to be executed.

process {NAME OF PROCESS} {
  input:
    {INPUT FILES}

  output:
    {OUTPUT FILES}

  script:
    """
    {SCRIPT TO RUN}
    """
}

Example processes

Process 1: Initial Data Processing

process Process_1 {
    input:
    val VAL_1
    path INPUT_1

    output:
    val VAL_1
    path "output_file_1.txt"

    shell:
    """
    program_x --input ${INPUT_1} --output output_file_1.txt
    """
}

Explanation:

  • Inputs: val VAL_1 and path INPUT_1 are placeholders for the input data. The actual data these variables represent will be assigned later in the workflow.

  • Outputs: The process outputs the same val VAL_1 it received and creates a new file, output_file_1.txt.

  • Shell Command: Executes program_x using the input file and generates an output file. Writing ${INPUT_1} allows the Shell command to access that variable.

Process 2: Further Data Manipulation

process Process_2 {
    input:
    val VAL_1
    path OUTPUT_1

    output:
    path "output_file_2.txt"

    shell:
    """
    program_y --input ${OUTPUT_1} --output output_file_2.txt
    """
}

Explanation:

  • Inputs: This process takes the output from Process_1. path OUTPUT_1 is the output file from Process_1.

  • Output: Generates a new file, output_file_2.txt.

  • Shell Command: Runs program_y using the output from Process_1.

Workflow: Connecting Processes

workflow {
    process_1 = Process_1("human_gut_1", " /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/human_gut_1_R1.fastq.gz")
    Process_2(process_1[0], process_1[1])
}

Explanation:

  • Variable process_1: This is a user-defined variable that stores the outputs from Process_1. The name is arbitrary but should be descriptive.

  • Connecting Processes: Process_2 takes the outputs from process_1 (process_1[1] and process_1[2]) as its inputs, linking the two processes in the workflow.

In Nextflow, each process has an output: section where we define what it produces and an input: section for what it needs to start. These sections, along with the workflow structure, are what make Nextflow so efficient and powerful. Within the workflow, variables like process_1 and process_2 hold the outputs from their corresponding processes in a list format. For example, by using process_1[1], you access the first output you specify in Process_1 (VAL_1). To learn more about how workflows function, check out this Workflow Guide.

Nextflow executor

In the Nextflow framework architecture, the executor is the component that determines the system where a pipeline process is run and supervises its execution.

The executor provides an abstraction between the pipeline processes and the underlying execution system. This allows you to write the pipeline functional logic independently from the actual processing platform.

In other words, you can write your pipeline script once and have it running on your computer, a cluster resource manager, or the cloud — simply change the executor definition in the Nextflow configuration file.

Source

Slurm

The slurm executor allows you to run your pipeline script using the SLURM resource manager.

Nextflow manages each process as a separate job that is submitted to the cluster using the sbatch command.

The pipeline must be launched from a node where the sbatch command is available, which is typically the cluster login node.

To enable the SLURM executor, set process.executor = 'slurm' in the nextflow.config file.

Source

The administrators of the Vera cluster asks that you use their TMPDIR for executing processes to reduce I/O in the main storage. What does this mean? It means that files for a process are symlinked to this directory, and that all temporary output files are located here. The final output will then be copied to the permanent storage, and all temporary file deleted. You activate this by having process.scratch = true in the config file. (already included for you).

Tips and tricks

Operators

Operators are helper functions which can be very useful, especially when directing data streams between processes. They can be used to combine, split, and so much more. Here is the documentation about operators. At the top of that page are the most useful and commonly used operators listed.

View

Sometimes it might be difficult to know what files flow through each channel. If so you can use the view operator to print the content of a channel.

Collect

Normally Nextflow provides a process with one set of files per sample. But what if a process needs the files produced for all samples. This can be useful in a summary process such as MultiQC, which will combine all quality reports from TrimGalore!. Can you figure out how the collect operator?

Description of the Pipeline

In this tutorial, you’ll be recreating a version of a pipeline used for determining the composition of antibiotic resistance genes and taxonomy in a metagenomic sample. We’ll provide the commands for each process, and guide you on the order in which they should be executed. Additionally, we’ll offer hints on linking channels to processes. If anything is unclear or if you have questions at any point, please don’t hesitate to ask!

flowchart TD

        p7[Build_ResFinder_db]

        p0{FASTQ files}
        p1[TrimGalore]

        p14[Diamond]
        p15[MetaxaQR]
        p17[MetaxaQR_ttt]
        p20[MetaxaQR_dc]
        p24[Gene_Normalization]

        p4[MultiQC]
 
    p0 --> p1
    p1 --> p14
    p1 --> p4
    p1 --> p15
    p7 --> p14
    p14 --> p24
    p15 --> p17
    p17 --> p20
    p17 -->p24

Each process

FASTQ files

This is technically not a process, but part of the workflow. The primary input for this pipeline is FASTQ files. To efficiently add these files to a channel, we’ll use the fromFilePairs operator. This operator creates a tuple for each pair of FASTQ files: [sample_id, [R1, R2]]. This approach simplifies tracking which sample each read belongs to and ensures that the pairs of R1 and R2 files are correctly associated. Learn more about using this operator here.

Build_ResFinder_db

ResFinder is an extensively used, manually curated database. It contains antibiotic resistance genes, particularly those found in mobile genetic elements.

This script:

  • Downloads the database in DNA format by cloning it from GitHub.
  • Creates a file will ar resistance genes.
  • prodigal is use to identify the coding part of the gene and translating it into amino acids.
  • sedis used to remove unnecessary characters added by prodigal.
  • ResFinder can contain genes which are very similar to each other. cd-hit clusters them at 95% identity to create a non redundant database where diamondwill have a greater likelihood of differentiating the different genes from each other.
  • diamond makedb is use to convert the ResFinder database to a format that diamond can use.

Code

    # Download ResFinder from github
    git clone https://git@bitbucket.org/genomicepidemiology/resfinder_db.git
   
    # Combines  individual ARG files 
    cat resfinder_db/*.fsa > Combined_Resfinder.fsa

    # Remove unnecessary dir
    mv resfinder_db/* .
    rm -r resfinder_db/
    
    # Translates DNA to protein
    prodigal -i Combined_Resfinder.fsa -a Combined_Resfinder.translations.faa -p meta
    
    # Removes characters added by prodigal during translation
    sed -i 's/\\*//g' Combined_Resfinder.translations.faa
    sed -i 's/ .*\$//' Combined_Resfinder.translations.faa

    # Cluster db at 95% identity
    cd-hit -i Combined_Resfinder.translations.faa -o Combined_Resfinder.translations_cluster_95.fasta -c 0.95 -n 5 -M 0 -d 1000 -T 1

    # Created Diamond db
    diamond makedb --in Combined_Resfinder.translations_cluster_95.fasta -d ResFinderARGs
Flag to specify nr CPUs

None

Data flow

    input: None

    output: 
    ResFinderARGs.dmnd
    Combined_Resfinder.translations.faa
    phenotypes.txt

TrimGalore

This tool perform quality trimming of the reads.

  • Removes sequences of poor quality and length
  • Removes sequencing adapters
  • Outputs quality reports

Code

trim_galore --paired -j 5 --fastqc ${ENTER_READ_R1} ${ENTER_READ_R2}
Flag to specify nr CPUs

-j

Input and output

Input:
sample_id and reads

Output:
sample id, *R1*val*.fq.gz and *R2*val*.fq.gz # Write this in such a way that it captures both R1 and R2 in one glob pattern
*${sample_id}*fastqc.zip
*${sample_id}*trimming_report.txt

You should write the glob pattern in such a way that the first output contains both the R1 and R2 files.

Diamond

This tool is a faster version of BLASTX (100X - 1 000x faster). It translates the DNA reads into the 6 possible amino acid reading frames and aligns them to the amino acid database.

Code

diamond blastx --query ${R1_read} --db  ${ENTER_DB} -o ${ENTER_SAMPLE_ID}.tsv  --id 90 --query-cover 80  --threads 5 --outfmt 6 qseqid sseqid pident evalue length slen
Flag to specify nr CPUs

--threads

Input and output

Input:
sample id and trimmed reads
ResFinderARGs.dmnd

Output:
{SAMPLE_ID}.tsv

MetaxaQR

MetaxaQR is a tool that determines the taxonomic composition of a sample. It uses HMMs to identify reads from the 16s rRNA gene. This gene contains conserved regions across bacteria, making it easy to identify. It also contains hyper variable regions, making it distinguishable between taxa.

This script:

  • Unzip the read files
  • Performs taxonomic classification
  • Removes unziped metagenomic reads
  • Gzip MetaxaQR fasta output

Code

# Unzip metagenomes
zcat ${READS_R1} > read_R1.fastq &
zcat ${READS_R2} > read_R2.fastq &

# Wait for the background processes to finish
wait

# Run MetaxaQR
metaxaQR -1 read_R1.fastq -2 read_R2.fastq -o ${SAMPLE_ID} --cpu 5 -g SSU -d /MetaxaQR/metaxaQR_db/SSU/mqr

# Remove unziped metagenomes
rm read_R*.fastq 

# Zip fasta output
pigz -p 5 *.fasta 

Input and output

Input:
sample id and trimmed reads

Output:
sample id and ${sample_id}.taxonomy.txt
Flag to specify nr CPUs

--cpu

MetaxaQR_ttt

This tool converts MetaxaQR’s output to a workable tsv file.

Code

metaxaQR_ttt -i ${METAXAQR_OUTPUT} -o ${SAMPLE_ID}  -m 6
Flag to specify nr CPUs

None

Input and Output

Input:
sample_id and output from MetaxaQR

Output:
${sample_id}.level_1.txt
${sample_id}.level_6.txt

MetaxaQR_dc

This tool takes all output files from MetaxaQR_ttt and concatenates them into a singe file.

Code

metaxaQR_dc *.level_6.txt 

Note Do not forget to use the collect operator for the level_6.txt input.

Flag to specify nr CPUs

None

Input and Output

Input:
level_6.txt 

Output:
collected_data.txt

Gene_Normalization

This code does:

  • Identifies the number of 16s rRNA sequences per sample.
  • Chooses the best match per read for the diamond output.
  • Group and summarize the abundance of each individual resistance gene.
  • Normalizes the antibiotic resistance gene abundance per sample.
  • Assigns each resistance gene a resistance phenotype.

Code

Copy this code to a script and save it on the cluster like this {NAME}.py

import glob
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def count_16s_sequences():
    """
    Counts the number of 16s sequences in files matching '*.level_1.txt'.
    
    Returns:
        dict: A dictionary mapping sample IDs to the number of 16s sequences.
    """
    # Find all relevant MetaxaQR files
    files = glob.glob('*.level_1.txt')
    sequence_count_dict = {}
     
    # Load files and append sample_id and nr SSU to dict   
    for file in files:
        sample_id = file.split('.level_1.txt')[0]
        df = pd.read_csv(file, sep='\t', names=['kingdom', 'nr_ssu'])
        bacteria_count = list(df[df.kingdom == 'Bacteria'].nr_ssu)[0]
        # Dict with sample ID as key and nr bacterial SSU as value
        sequence_count_dict[sample_id] = bacteria_count
    
    return sequence_count_dict

def filter_group_for_best_alignment(group):
    """
    Filters a DataFrame group to find the best alignment based on criteria.

    Args:
        group (DataFrame): The DataFrame group to filter.

    Returns:
        DataFrame: A DataFrame containing the filtered results.
    """
    # Best his has lowest e-value, highest pident and length
    # If multiple hits has same value return a randomly selected best hit
    group = group[group['evalue'] == group['evalue'].min()]
    group = group[group['pident'] == group['pident'].max()]
    group = group[group['length'] == group['length'].max()]
    
    return group.sample(1, random_state=1)

def process_file(file, mode):
    """
    Processes a single file to group and filter data based on sequence identity.

    Args:
        file (str): The file path to process.
        mode (str): The mode of processing, affects file parsing.

    Returns:
        DataFrame: A DataFrame with processed and filtered data.
    """
    sample_id = file.split(f'.tsv')[0]
    colnames = ['qseqid', 'sseqid', 'pident', 'evalue', 'length', 'slen']
    df = pd.read_csv(file, sep='\t', names=colnames)
    # Function for selecting best hit
    df_grouped = df.groupby('qseqid').apply(filter_group_for_best_alignment).reset_index(drop=True)
    df_grouped['sample_id'] = sample_id
    return df_grouped[['sseqid', 'slen', 'sample_id']]

def process_file_wrapper(file, mode):
    """
    Wrapper function for process_file to make it compatible with ProcessPoolExecutor.

    Args:
        file (str): The file path to process.
        mode (str): The mode of processing.

    Returns:
        DataFrame: A DataFrame with processed and filtered data.
    """
    return process_file(file, mode)

def get_files(mode):
    """
    Retrieves and processes files in parallel, concatenating the results.

    Args:
        mode (str): The mode of processing to determine file patterns.

    Returns:
        DataFrame: A concatenated DataFrame of all processed files.
    """
    files = glob.glob(f'*.tsv')
    
    # Parallelizes processing files
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(process_file_wrapper, files, [mode]*len(files)))
    
    # Output here is the best hit for each read
    return pd.concat(results, ignore_index=True)

def normalize_gene_data(df_temp, nr_ssu):
    """
    Normalizes gene data based on sequence length and 16s sequence counts.

    Args:
        df_temp (DataFrame): The DataFrame with gene data.
        nr_ssu (dict): A dictionary mapping sample IDs to 16s sequence counts.

    Returns:
        DataFrame: A DataFrame with normalized gene data.
    """
    # Counts how many reads aligned with each ARG/MGE
    gene_counts = df_temp['sseqid'].value_counts().reset_index()
    gene_counts.columns = ["Genes", "Count"]
    
    # Add data like gene length etc
    df_temp = df_temp.merge(gene_counts, left_on='sseqid', right_on='Genes', how='right')
    
    # add nr of ssu for each sample
    df_temp['nr_ssu'] = df_temp['sample_id'].map(nr_ssu)
    
    # Normalize gene abundance  
    df_temp['Gene_normalized'] = (df_temp['Count'] / df_temp['slen']) / (df_temp['nr_ssu'] / 720)

    # Due to me writing the script in a strange way we will get duplicate rows with identical data
    # Output here is a non redundant df.
    df_temp.drop_duplicates(subset=['sseqid', 'sample_id'], inplace=True)
    
    return df_temp[['Genes', 'Count', 'nr_ssu', 'slen', 'Gene_normalized', 'sample_id']]

def read_and_normalize_files(mode, nr_ssu):
    """
    Reads and normalizes files based on the specified mode and 16s sequence counts.

    Args:
        mode (str): The mode of processing.
        nr_ssu (dict): A dictionary mapping sample IDs to 16s sequence counts.

    Returns:
        DataFrame: A DataFrame with normalized gene data.
    """
    df_temp = get_files(mode)
    return normalize_gene_data(df_temp, nr_ssu)

def assign_phenotypes_to_args(diamond_out):
    """
    Assigns phenotypes to ARGs based on gene accession numbers.

    Args:
        diamond_out (DataFrame): The DataFrame containing ARG data.
    """
    # Load phenotypic information from ResFinder
    cols = ['Gene_accession no.', 'Class']
    phenotype_data = pd.read_csv('phenotypes.txt', sep='\t', usecols=cols)
    
    # Handling multiple resistance phenotypes
    # looking for ',' since phenotypes are written "tetracycline, aminoglycosides" etc.
    # eg. If a gene confers resistance towards more than 1 antibiotic is it called "multiple"
    multiple_resistance_classes = [cls for cls in phenotype_data['Class'].unique() if ',' in cls]
    
    # Prodigal adds _ and a number to each translated gene name. 
    # Here I remove those characters from the gene name so it matches 
    # with the one in the ResFinder Phenotype file.
    diamond_out['Phenotype'] = diamond_out['Genes'].apply(lambda x: '_'.join(x.rsplit('_', 1)[0:-1]))
    
    # Replace phenotype to multiple if in multiple_resistance_classes list
    phenotype_data.loc[phenotype_data['Class'].isin(multiple_resistance_classes), 'Class'] = 'Multiple'
    
    # There are some genes which have multiple entries in the Phenotype list
    # Here I make sure that they are classified as Multiple
    duplicated_args = phenotype_data[phenotype_data.duplicated(['Gene_accession no.'])]
    phenotype_data.loc[phenotype_data['Gene_accession no.'].isin(duplicated_args["Gene_accession no."]), 'Class'] = 'Multiple'
    phenotype_data.drop_duplicates(subset=['Gene_accession no.'], inplace=True, keep='first')
    phenotype_data.set_index('Gene_accession no.', inplace=True)
    
    # Converting phenotype_data to dict and use it to translate gene manes to phenotype
    phenotype_dict = phenotype_data['Class'].to_dict()
    diamond_out.replace({"Phenotype": phenotype_dict}, inplace=True)
    diamond_out.to_csv('ARG_norm.tsv', sep='\t', index=False)

def main():
    """
    Main function to process files and infer phenotypes.
    """
    modes = ['ResFinder']
    for mode in modes: 
        SSU_counts = count_16s_sequences()
        normalized_files = read_and_normalize_files(mode, SSU_counts)  
        if mode == 'ResFinder':
            assign_phenotypes_to_args(normalized_files)
        elif mode == 'mobileOG':
            normalized_files.to_csv('MGE_norm.tsv', sep='\t', index=False)

if __name__ == '__main__':
    main()
Flag to specify nr CPUs

None

How to execute script
python3 {PATH_TO_SCRIPT}/{NAME}.py

Input and Output

Note Do not forget to use the collect operator for the ${sample_id}.tsv and ${sample_id}.level_1.txt input.

Input:
phenotypes.txt
${sample_id}.tsv 
${sample_id}.level_1.txt 

Output:
ARG_norm.tsv

MultiQC

Code

multiqc -f *fastqc.zip

Input and Output

Note Do not forget to use the collect operator for the *fastqc.zip.

Input:
*fastqc.zip 

Output:
multiqc_report.html
multiqc_data
Flag to specify nr CPUs

None

Where Can You Find All the Necessary Files?

You can locate all the required files for this tutorial at the following link: [Insert Link Here].

Building the Container

Below is the code for the container definition file. Simply copy this code into a file on the cluster and save it as {name}.def. To build the container, execute the command apptainer build {name}.sif {name}.def. Please note, building the container might take some time, so it’s best to start this at the beginning of the tutorial.

Bootstrap: docker
From: ubuntu:23.10

%post
  export DEBIAN_FRONTEND=noninteractive
  # Update OS and install packages
  apt-get update 
  apt-get install -y \
    git \
    wget \
    original-awk \
    python3.11 \
    python3-pip \
    zlib1g-dev \
    curl \
    cutadapt \
    fastqc \
    mafft \
    hmmer \
    autoconf \
    prodigal

  apt-get clean

  # Install Nextflow 
  mkdir Nextflow
  cd Nextflow
  wget -qO- https://get.nextflow.io | bash
  chmod +x nextflow
  cd ..

  # Install Vsearch
  wget https://github.com/torognes/vsearch/archive/v2.25.0.tar.gz
  tar xzf v2.25.0.tar.gz
  cd vsearch-2.25.0
  ./autogen.sh
  ./configure CFLAGS="-O3" CXXFLAGS="-O3"
  make
  make install
  cd ..
  rm v2.25.0.tar.gz

  # Install cd-hit
  wget https://github.com/weizhongli/cdhit/releases/download/V4.8.1/cd-hit-v4.8.1-2019-0228.tar.gz
  tar xvf cd-hit-v4.8.1-2019-0228.tar.gz
  rm -rf cd-hit-v4.8.1-2019-0228.tar.gz
  cd cd-hit-v4.8.1-2019-0228
  make
  cd cd-hit-auxtools
  make
  cd ..
  cd ..
  
  # install Trim Galore
  curl -fsSL https://github.com/FelixKrueger/TrimGalore/archive/0.6.10.tar.gz -o trim_galore.tar.gz
  tar xvzf trim_galore.tar.gz
  rm trim_galore.tar.gz

  # Install Diamond
  mkdir Diamond
  cd Diamond
  wget http://github.com/bbuchfink/diamond/releases/download/v2.1.8/diamond-linux64.tar.gz
  tar xzf diamond-linux64.tar.gz
  rm -rf diamond-linux64.tar.gz
  cd ..

  # Install MetaxaQR 
  git clone https://github.com/bengtssonpalme/MetaxaQR.git
  chmod +x MetaxaQR/*
  MetaxaQR/metaxaQR_install_database -g SSU -v SILVA138
  
  # Install python packages
  python3 -m pip install  pandas==2.1.3 biopython==1.81 multiqc==1.18 --break-system-packages

%environment
  export LC_ALL=C
  export PATH=/MetaxaQR/:$PATH
  export PATH=/Nextflow/:$PATH
  export PATH=/TrimGalore-0.6.10/:$PATH
  export PATH=/Diamond/:$PATH
  export PATH=/vsearch-2.25.0/:$PATH
  export PATH=/cd-hit-v4.8.1-2019-0228/:$PATH

Let’s Get Started!

Now, it’s time for you to define processes and channels and connect them in a workflow. But first, you need to add a configuration file.

Where is the data?

The input fastq files for the pipeline can be found here /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/.

Adding a Configuration File

A configuration file is essential for running your pipeline in Nextflow. While Nextflow allows extensive customization, we will only cover the basics in this workshop. Combine all the following code snippets into a single document named nextflow.config.

Enable DSL 2

DSL2 is the latest version of the Nextflow language.

nextflow.enable.dsl=2

Parameters

params.files = ""
params.cluster_options = ""
params.container = ""

This setup creates the following flags:

  • --files: Specify the location of the FASTQ files.

  • --cluster_options: Enter specific options for the cluster. This should be assign the value "-A C3SE2023-2-17 -p vera".

  • --container: Define where the container is located.

These will become variables in your processes and workflows. You can name them as you like. Alternatively, assign values directly here, like params.container = "{PATH}/container.sif".

Profiles

Your configuration will vary depending on whether you’re running Nextflow from the command line or using an Sbatch script. Here, we define profiles to instruct Nextflow how to behave in local or Slurm environments. Later on you can use -profile 'local' or -profile 'slurm' to specify the desired environment from the command line or an sbatch script.

Note here that you only need one -. This is because -profile is a pre-defined Nextflow flag. All flags you define through params. require two dashes --. If you want to read more about how to configure the config file you can read about it here.

profiles {
    local {
        process.cache = true 
        process.container = params.container
        process.executor = 'local'
        apptainer.enabled = true
        apptainer.autoMounts = true

    }

    slurm {
        process.cache = true
        process.container = params.container
        process.executor = 'slurm'
        process.clusterOptions = params.cluster_options
        process.scratch = true
        apptainer.enabled = true
        apptainer.autoMounts = true   
    }
}

Start Building the Pipeline

  1. Save the config file in the same directory and name it nextflow.config. Nextflow will automatically find this file. If the file is located elsewhere, specify its location using the -c flag.
  2. Begin by creating processes for the tools (Section 6.1) and connect them using a workflow (Section 6). The file containing the processes and workflow should have the .nf extension and be called main.nf.
  3. Use the publishDir directive to tell Nextflow where to store the pipeline’s output. More about this can be found here.
  4. Test locally with a single file using an interactive Slurm session. Find more information about nextflow run here.

If you haven’t specified the values of params.container and params.files in the config file, you’ll need to include them in the command line.

On Vera, you can either submit a non-interactive sbatch job to run in the background, or start an interactive session like this (replace {NR_CPU} with the number of CPUs needed and the estimated duration in HH:MM:SS.

Change -t

srun -A C3SE2023-2-17 --nodes 1 --ntasks 1 --cpus-per-task 10  -t 00:00:00  --pty bash -is

Nextflow is preinstalled on the cluster. All you have to do is to load it.

module load Nextflow/23.10.0

To check that it loaded correctly you type:

nextflow -h

If you got the help output everything worked as expected!

You run the pipeline by executing this command on the command line.

nextflow run main.nf -profile 'local' -resume

The -resume flag enables Nextflow to start the pipeline from where you last left off. Nextflow automatically caches runs, but it does not automatically resume them unless you use this flag.

Once your pipeline works correctly with one file, try running it with all the FASTQ files in /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/.

Adding Trace Report and Process Execution Time

Gaining detailed insights into the resource requirements of a process can be very valuable. Questions like how long a process ran, how much RAM it consumed, and whether it fully utilized the allocated CPU cores are crucial. This information can help you benchmark different tools to find the best fit for your needs based on available resources. Moreover, it’s likely you’ll use this information to specify the resource demand for each tool in the next step of this workshop. You can find more about what this report looks like and how to activate it here. Also, don’t forget to activate the timeline report function.

Adding Resource Requirements

Review the tracing and timeline report to determine the resource demands of each process. Then, specify the time, cpus, and memory each process needs.

When you specify cpus, you tell Nextflow that the process is supposed to use x number of cpus. You also need to tell the tool the number of cpus it is supposed to use. That is usually done via a flag. When you specify cpus the value you assing it wil lalso be saved in a varible called ${task.cpus}. Call this varible after the cpu allocation flag for each script that needs it to automatically assign the right number of cpus.

Compare the timeline report before and after you added the resource requirements.

Why is this important? No matter where you run your processes (locally or on a cluster), there will be resource limitations. By telling Nextflow the amount of resources each process requires, it can optimize resource utilization through parallelization.

Submitting a Job Using Slurm

Here’s an sbatch script you’ll need to modify slightly. Replace {NAME} with a name for your run (the specific name isn’t crucial). Then, indicate the expected duration of the run after the -t flag (in HH:MM:SS). Remember, the time specified here is for the entire pipeline, not individual processes. Also, replace {PATH} with where you’d like the stdout and stderr output to go. This will help you monitor the pipeline’s progress and catch any errors.

This sbatch job will submit a separate sbatch job for each process.

#!/usr/bin/env bash
#SBATCH -A C3SE2023-2-17 -p vera
#SBATCH -J {NAME}
#SBATCH -c 1
#SBATCH -t 00:00:00
#SBATCH --error={PATH}/job.%J.err 
#SBATCH --output={PATH}/job.%J.out 

# Unload unwanted packages and load Nextflow
module purge
module load Nextflow/23.10.0

# Add flags if relevant
nextflow run main.nf -profile 'slurm' -resume

You save this script {NAME}.sh. To submit the job you enter sbatch {NAME}.sh. To monitor how the job is going you can either open the {PATH}/job.%J.err or {PATH}/job.%J.out files. If you want a list of all your submitted jobs use this comand squeue -u {your_username}. If you want to get inforamtion about a specific job use squeue -j {job_id}.

For this job I want you to analyze all fastq files in the directory. When it has finished take a look at the time line report. Can you now see the value of using Nextflow to parallelize your processes?