Nextflow workshop
Goals of This Tutorial
- Gain a clear understanding of how Nextflow functions.
- Learn to create your own pipeline.
- Discover how to use Nextflow operators to simplify your coding.
- Explore the integration of containers with Nextflow.
- Learn the steps to launch a job on the Vera cluster.
Layout of This Tutorial
In this tutorial, you will primarily work with pre-written code. We will provide you with practical examples and helpful hints. However, you will be crafting much of the code on your own. Use the Nextflow documentation as your primary resource. Of course, feel free to ask as many questions as you need!
We’ll begin with a brief presentation on Nextflow’s mechanics, followed by hands-on coding sessions.
Processes and channels
Processes
In practice a Nextflow pipeline script is made by joining together different processes. Each process can be written in any scripting language that can be executed by the Linux platform (Bash, Perl, Ruby, Python, etc.).
Processes are executed independently and are isolated from each other, i.e. they do not share a common (writable) state. The only way they can communicate is via asynchronous FIFO queues, called channels in Nextflow. The fact that the flow of data is asynchronous means that the output files might not be in the same order as the input files. ie Nextflow does not care about the order of the files. Just that they flow between processes in the proper order.
Any process can define one or more channels as input and output. The interaction between these processes, and ultimately the pipeline execution flow itself, is implicitly defined by these input and output declarations.
In Nextflow, you decide what kind of data each process handles. This includes defining inputs and outputs. Inputs and outputs can vary – they might be a simple variable, a file path, standard output, etc. This versatility is what makes channels in Nextflow so powerful. You can seamlessly integrate file paths with variables (like sample names), as you’ll discover in this tutorial.
The inputs and outputs for each process are defined within the Process
itself. But how do you link the output of one process to the input of another? Let’s look at an example workflow where this connection is made.
In Nextflow, processes are defined within curly braces {}. Each process specifies its inputs, outputs, and the command to be executed.
{NAME OF PROCESS} {
process :
input{INPUT FILES}
:
output{OUTPUT FILES}
:
script"""
{SCRIPT TO RUN}
"""
}
Example processes
Process 1: Initial Data Processing
{
process Process_1 :
input
val VAL_1
path INPUT_1
:
output
val VAL_1"output_file_1.txt"
path
:
shell"""
program_x --input ${INPUT_1} --output output_file_1.txt
"""
}
Explanation:
Inputs: val VAL_1 and path INPUT_1 are placeholders for the input data. The actual data these variables represent will be assigned later in the workflow.
Outputs: The process outputs the same val VAL_1 it received and creates a new file, output_file_1.txt.
Shell Command: Executes program_x using the input file and generates an output file. Writing ${INPUT_1} allows the Shell command to access that variable.
Process 2: Further Data Manipulation
{
process Process_2 :
input
val VAL_1
path OUTPUT_1
:
output"output_file_2.txt"
path
:
shell"""
program_y --input ${OUTPUT_1} --output output_file_2.txt
"""
}
Explanation:
Inputs: This process takes the output from Process_1. path OUTPUT_1 is the output file from Process_1.
Output: Generates a new file, output_file_2.txt.
Shell Command: Runs program_y using the output from Process_1.
Workflow: Connecting Processes
{
workflow = Process_1("human_gut_1", " /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/human_gut_1_R1.fastq.gz")
process_1 Process_2(process_1[0], process_1[1])
}
Explanation:
Variable process_1: This is a user-defined variable that stores the outputs from Process_1. The name is arbitrary but should be descriptive.
Connecting Processes: Process_2 takes the outputs from process_1 (process_1[1] and process_1[2]) as its inputs, linking the two processes in the workflow.
In Nextflow, each process has an output:
section where we define what it produces and an input:
section for what it needs to start. These sections, along with the workflow
structure, are what make Nextflow so efficient and powerful. Within the workflow
, variables like process_1
and process_2
hold the outputs from their corresponding processes in a list format. For example, by using process_1[1]
, you access the first output you specify in Process_1
(VAL_1). To learn more about how workflows function, check out this Workflow Guide.
Nextflow executor
In the Nextflow framework architecture, the executor is the component that determines the system where a pipeline process is run and supervises its execution.
The executor provides an abstraction between the pipeline processes and the underlying execution system. This allows you to write the pipeline functional logic independently from the actual processing platform.
In other words, you can write your pipeline script once and have it running on your computer, a cluster resource manager, or the cloud — simply change the executor definition in the Nextflow configuration file.
Slurm
The
slurm
executor allows you to run your pipeline script using the SLURM resource manager.Nextflow manages each process as a separate job that is submitted to the cluster using the
sbatch
command.The pipeline must be launched from a node where the
sbatch
command is available, which is typically the cluster login node.To enable the SLURM executor, set
process.executor = 'slurm'
in thenextflow.config
file.
The administrators of the Vera cluster asks that you use their TMPDIR
for executing processes to reduce I/O in the main storage. What does this mean? It means that files for a process are symlinked to this directory, and that all temporary output files are located here. The final output will then be copied to the permanent storage, and all temporary file deleted. You activate this by having process.scratch = true
in the config file. (already included for you).
Tips and tricks
Operators
Operators are helper functions which can be very useful, especially when directing data streams between processes. They can be used to combine, split, and so much more. Here is the documentation about operators. At the top of that page are the most useful and commonly used operators listed.
View
Sometimes it might be difficult to know what files flow through each channel. If so you can use the view operator to print the content of a channel.
Collect
Normally Nextflow provides a process with one set of files per sample. But what if a process needs the files produced for all samples. This can be useful in a summary process such as MultiQC, which will combine all quality reports from TrimGalore!. Can you figure out how the collect operator?
Description of the Pipeline
In this tutorial, you’ll be recreating a version of a pipeline used for determining the composition of antibiotic resistance genes and taxonomy in a metagenomic sample. We’ll provide the commands for each process, and guide you on the order in which they should be executed. Additionally, we’ll offer hints on linking channels to processes. If anything is unclear or if you have questions at any point, please don’t hesitate to ask!
Each process
FASTQ files
This is technically not a process, but part of the workflow. The primary input for this pipeline is FASTQ files. To efficiently add these files to a channel, we’ll use the fromFilePairs
operator. This operator creates a tuple for each pair of FASTQ files: [sample_id, [R1, R2]]
. This approach simplifies tracking which sample each read belongs to and ensures that the pairs of R1 and R2 files are correctly associated. Learn more about using this operator here.
Build_ResFinder_db
ResFinder is an extensively used, manually curated database. It contains antibiotic resistance genes, particularly those found in mobile genetic elements.
This script:
- Downloads the database in DNA format by cloning it from GitHub.
- Creates a file will ar resistance genes.
prodigal
is use to identify the coding part of the gene and translating it into amino acids.sed
is used to remove unnecessary characters added byprodigal
.- ResFinder can contain genes which are very similar to each other.
cd-hit
clusters them at 95% identity to create a non redundant database wherediamond
will have a greater likelihood of differentiating the different genes from each other. diamond makedb
is use to convert the ResFinder database to a format thatdiamond
can use.
Code
# Download ResFinder from github
git clone https://git@bitbucket.org/genomicepidemiology/resfinder_db.git
# Combines individual ARG files
cat resfinder_db/*.fsa > Combined_Resfinder.fsa
# Remove unnecessary dir
mv resfinder_db/* .
rm -r resfinder_db/
# Translates DNA to protein
prodigal -i Combined_Resfinder.fsa -a Combined_Resfinder.translations.faa -p meta
# Removes characters added by prodigal during translation
sed -i 's/\\*//g' Combined_Resfinder.translations.faa
sed -i 's/ .*\$//' Combined_Resfinder.translations.faa
# Cluster db at 95% identity
cd-hit -i Combined_Resfinder.translations.faa -o Combined_Resfinder.translations_cluster_95.fasta -c 0.95 -n 5 -M 0 -d 1000 -T 1
# Created Diamond db
diamond makedb --in Combined_Resfinder.translations_cluster_95.fasta -d ResFinderARGs
Flag to specify nr CPUs
None
Data flow
input: None
output:
ResFinderARGs.dmnd
Combined_Resfinder.translations.faa
phenotypes.txt
TrimGalore
This tool perform quality trimming of the reads.
- Removes sequences of poor quality and length
- Removes sequencing adapters
- Outputs quality reports
Code
trim_galore --paired -j 5 --fastqc ${ENTER_READ_R1} ${ENTER_READ_R2}
Flag to specify nr CPUs
-j
Input and output
Input:
sample_id and reads
Output:
sample id, *R1*val*.fq.gz and *R2*val*.fq.gz # Write this in such a way that it captures both R1 and R2 in one glob pattern
*${sample_id}*fastqc.zip
*${sample_id}*trimming_report.txt
You should write the glob pattern in such a way that the first output contains both the R1 and R2 files.
Diamond
This tool is a faster version of BLASTX
(100X - 1 000x faster). It translates the DNA reads into the 6 possible amino acid reading frames and aligns them to the amino acid database.
Code
diamond blastx --query ${R1_read} --db ${ENTER_DB} -o ${ENTER_SAMPLE_ID}.tsv --id 90 --query-cover 80 --threads 5 --outfmt 6 qseqid sseqid pident evalue length slen
Flag to specify nr CPUs
--threads
Input and output
Input:
sample id and trimmed reads
ResFinderARGs.dmnd
Output:
{SAMPLE_ID}.tsv
MetaxaQR
MetaxaQR
is a tool that determines the taxonomic composition of a sample. It uses HMMs to identify reads from the 16s rRNA gene. This gene contains conserved regions across bacteria, making it easy to identify. It also contains hyper variable regions, making it distinguishable between taxa.
This script:
- Unzip the read files
- Performs taxonomic classification
- Removes unziped metagenomic reads
- Gzip MetaxaQR fasta output
Code
# Unzip metagenomes
zcat ${READS_R1} > read_R1.fastq &
zcat ${READS_R2} > read_R2.fastq &
# Wait for the background processes to finish
wait
# Run MetaxaQR
metaxaQR -1 read_R1.fastq -2 read_R2.fastq -o ${SAMPLE_ID} --cpu 5 -g SSU -d /MetaxaQR/metaxaQR_db/SSU/mqr
# Remove unziped metagenomes
rm read_R*.fastq
# Zip fasta output
pigz -p 5 *.fasta
Input and output
Input:
sample id and trimmed reads
Output:
sample id and ${sample_id}.taxonomy.txt
Flag to specify nr CPUs
--cpu
MetaxaQR_ttt
This tool converts MetaxaQR
’s output to a workable tsv file.
Code
metaxaQR_ttt -i ${METAXAQR_OUTPUT} -o ${SAMPLE_ID} -m 6
Flag to specify nr CPUs
None
Input and Output
Input:
sample_id and output from MetaxaQR
Output:
${sample_id}.level_1.txt
${sample_id}.level_6.txt
MetaxaQR_dc
This tool takes all output files from MetaxaQR_ttt
and concatenates them into a singe file.
Code
metaxaQR_dc *.level_6.txt
Note Do not forget to use the collect operator for the level_6.txt
input.
Flag to specify nr CPUs
None
Input and Output
Input:
level_6.txt
Output:
collected_data.txt
Gene_Normalization
This code does:
- Identifies the number of 16s rRNA sequences per sample.
- Chooses the best match per read for the
diamond
output. - Group and summarize the abundance of each individual resistance gene.
- Normalizes the antibiotic resistance gene abundance per sample.
- Assigns each resistance gene a resistance phenotype.
Code
Copy this code to a script and save it on the cluster like this {NAME}.py
import glob
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
def count_16s_sequences():
"""
Counts the number of 16s sequences in files matching '*.level_1.txt'.
Returns:
dict: A dictionary mapping sample IDs to the number of 16s sequences.
"""
# Find all relevant MetaxaQR files
= glob.glob('*.level_1.txt')
files = {}
sequence_count_dict
# Load files and append sample_id and nr SSU to dict
for file in files:
= file.split('.level_1.txt')[0]
sample_id = pd.read_csv(file, sep='\t', names=['kingdom', 'nr_ssu'])
df = list(df[df.kingdom == 'Bacteria'].nr_ssu)[0]
bacteria_count # Dict with sample ID as key and nr bacterial SSU as value
= bacteria_count
sequence_count_dict[sample_id]
return sequence_count_dict
def filter_group_for_best_alignment(group):
"""
Filters a DataFrame group to find the best alignment based on criteria.
Args:
group (DataFrame): The DataFrame group to filter.
Returns:
DataFrame: A DataFrame containing the filtered results.
"""
# Best his has lowest e-value, highest pident and length
# If multiple hits has same value return a randomly selected best hit
= group[group['evalue'] == group['evalue'].min()]
group = group[group['pident'] == group['pident'].max()]
group = group[group['length'] == group['length'].max()]
group
return group.sample(1, random_state=1)
def process_file(file, mode):
"""
Processes a single file to group and filter data based on sequence identity.
Args:
file (str): The file path to process.
mode (str): The mode of processing, affects file parsing.
Returns:
DataFrame: A DataFrame with processed and filtered data.
"""
= file.split(f'.tsv')[0]
sample_id = ['qseqid', 'sseqid', 'pident', 'evalue', 'length', 'slen']
colnames = pd.read_csv(file, sep='\t', names=colnames)
df # Function for selecting best hit
= df.groupby('qseqid').apply(filter_group_for_best_alignment).reset_index(drop=True)
df_grouped 'sample_id'] = sample_id
df_grouped[return df_grouped[['sseqid', 'slen', 'sample_id']]
def process_file_wrapper(file, mode):
"""
Wrapper function for process_file to make it compatible with ProcessPoolExecutor.
Args:
file (str): The file path to process.
mode (str): The mode of processing.
Returns:
DataFrame: A DataFrame with processed and filtered data.
"""
return process_file(file, mode)
def get_files(mode):
"""
Retrieves and processes files in parallel, concatenating the results.
Args:
mode (str): The mode of processing to determine file patterns.
Returns:
DataFrame: A concatenated DataFrame of all processed files.
"""
= glob.glob(f'*.tsv')
files
# Parallelizes processing files
with ProcessPoolExecutor() as executor:
= list(executor.map(process_file_wrapper, files, [mode]*len(files)))
results
# Output here is the best hit for each read
return pd.concat(results, ignore_index=True)
def normalize_gene_data(df_temp, nr_ssu):
"""
Normalizes gene data based on sequence length and 16s sequence counts.
Args:
df_temp (DataFrame): The DataFrame with gene data.
nr_ssu (dict): A dictionary mapping sample IDs to 16s sequence counts.
Returns:
DataFrame: A DataFrame with normalized gene data.
"""
# Counts how many reads aligned with each ARG/MGE
= df_temp['sseqid'].value_counts().reset_index()
gene_counts = ["Genes", "Count"]
gene_counts.columns
# Add data like gene length etc
= df_temp.merge(gene_counts, left_on='sseqid', right_on='Genes', how='right')
df_temp
# add nr of ssu for each sample
'nr_ssu'] = df_temp['sample_id'].map(nr_ssu)
df_temp[
# Normalize gene abundance
'Gene_normalized'] = (df_temp['Count'] / df_temp['slen']) / (df_temp['nr_ssu'] / 720)
df_temp[
# Due to me writing the script in a strange way we will get duplicate rows with identical data
# Output here is a non redundant df.
=['sseqid', 'sample_id'], inplace=True)
df_temp.drop_duplicates(subset
return df_temp[['Genes', 'Count', 'nr_ssu', 'slen', 'Gene_normalized', 'sample_id']]
def read_and_normalize_files(mode, nr_ssu):
"""
Reads and normalizes files based on the specified mode and 16s sequence counts.
Args:
mode (str): The mode of processing.
nr_ssu (dict): A dictionary mapping sample IDs to 16s sequence counts.
Returns:
DataFrame: A DataFrame with normalized gene data.
"""
= get_files(mode)
df_temp return normalize_gene_data(df_temp, nr_ssu)
def assign_phenotypes_to_args(diamond_out):
"""
Assigns phenotypes to ARGs based on gene accession numbers.
Args:
diamond_out (DataFrame): The DataFrame containing ARG data.
"""
# Load phenotypic information from ResFinder
= ['Gene_accession no.', 'Class']
cols = pd.read_csv('phenotypes.txt', sep='\t', usecols=cols)
phenotype_data
# Handling multiple resistance phenotypes
# looking for ',' since phenotypes are written "tetracycline, aminoglycosides" etc.
# eg. If a gene confers resistance towards more than 1 antibiotic is it called "multiple"
= [cls for cls in phenotype_data['Class'].unique() if ',' in cls]
multiple_resistance_classes
# Prodigal adds _ and a number to each translated gene name.
# Here I remove those characters from the gene name so it matches
# with the one in the ResFinder Phenotype file.
'Phenotype'] = diamond_out['Genes'].apply(lambda x: '_'.join(x.rsplit('_', 1)[0:-1]))
diamond_out[
# Replace phenotype to multiple if in multiple_resistance_classes list
'Class'].isin(multiple_resistance_classes), 'Class'] = 'Multiple'
phenotype_data.loc[phenotype_data[
# There are some genes which have multiple entries in the Phenotype list
# Here I make sure that they are classified as Multiple
= phenotype_data[phenotype_data.duplicated(['Gene_accession no.'])]
duplicated_args 'Gene_accession no.'].isin(duplicated_args["Gene_accession no."]), 'Class'] = 'Multiple'
phenotype_data.loc[phenotype_data[=['Gene_accession no.'], inplace=True, keep='first')
phenotype_data.drop_duplicates(subset'Gene_accession no.', inplace=True)
phenotype_data.set_index(
# Converting phenotype_data to dict and use it to translate gene manes to phenotype
= phenotype_data['Class'].to_dict()
phenotype_dict "Phenotype": phenotype_dict}, inplace=True)
diamond_out.replace({'ARG_norm.tsv', sep='\t', index=False)
diamond_out.to_csv(
def main():
"""
Main function to process files and infer phenotypes.
"""
= ['ResFinder']
modes for mode in modes:
= count_16s_sequences()
SSU_counts = read_and_normalize_files(mode, SSU_counts)
normalized_files if mode == 'ResFinder':
assign_phenotypes_to_args(normalized_files)elif mode == 'mobileOG':
'MGE_norm.tsv', sep='\t', index=False)
normalized_files.to_csv(
if __name__ == '__main__':
main()
Flag to specify nr CPUs
None
How to execute script
python3 {PATH_TO_SCRIPT}/{NAME}.py
Input and Output
Note Do not forget to use the collect operator for the ${sample_id}.tsv
and ${sample_id}.level_1.txt
input.
Input:
phenotypes.txt
${sample_id}.tsv
${sample_id}.level_1.txt
Output:
ARG_norm.tsv
MultiQC
Code
multiqc -f *fastqc.zip
Input and Output
Note Do not forget to use the collect operator for the *fastqc.zip
.
Input:
*fastqc.zip
Output:
multiqc_report.html
multiqc_data
Flag to specify nr CPUs
None
Where Can You Find All the Necessary Files?
You can locate all the required files for this tutorial at the following link: [Insert Link Here].
Building the Container
Below is the code for the container definition file. Simply copy this code into a file on the cluster and save it as {name}.def
. To build the container, execute the command apptainer build {name}.sif {name}.def
. Please note, building the container might take some time, so it’s best to start this at the beginning of the tutorial.
Bootstrap: docker
From: ubuntu:23.10
%post
export DEBIAN_FRONTEND=noninteractive
# Update OS and install packages
apt-get update
apt-get install -y \
\
git \
wget \
original-awk \
python3.11 \
python3-pip \
zlib1g-dev \
curl \
cutadapt \
fastqc \
mafft \
hmmer \
autoconf
prodigal
apt-get clean
# Install Nextflow
mkdir Nextflow
cd Nextflow
wget -qO- https://get.nextflow.io | bash
chmod +x nextflow
cd ..
# Install Vsearch
wget https://github.com/torognes/vsearch/archive/v2.25.0.tar.gz
tar xzf v2.25.0.tar.gz
cd vsearch-2.25.0
./autogen.sh
./configure CFLAGS="-O3" CXXFLAGS="-O3"
make
make install
cd ..
rm v2.25.0.tar.gz
# Install cd-hit
wget https://github.com/weizhongli/cdhit/releases/download/V4.8.1/cd-hit-v4.8.1-2019-0228.tar.gz
tar xvf cd-hit-v4.8.1-2019-0228.tar.gz
rm -rf cd-hit-v4.8.1-2019-0228.tar.gz
cd cd-hit-v4.8.1-2019-0228
make
cd cd-hit-auxtools
make
cd ..
cd ..
# install Trim Galore
curl -fsSL https://github.com/FelixKrueger/TrimGalore/archive/0.6.10.tar.gz -o trim_galore.tar.gz
tar xvzf trim_galore.tar.gz
rm trim_galore.tar.gz
# Install Diamond
mkdir Diamond
cd Diamond
wget http://github.com/bbuchfink/diamond/releases/download/v2.1.8/diamond-linux64.tar.gz
tar xzf diamond-linux64.tar.gz
rm -rf diamond-linux64.tar.gz
cd ..
# Install MetaxaQR
git clone https://github.com/bengtssonpalme/MetaxaQR.git
chmod +x MetaxaQR/*
MetaxaQR/metaxaQR_install_database -g SSU -v SILVA138
# Install python packages
python3 -m pip install pandas==2.1.3 biopython==1.81 multiqc==1.18 --break-system-packages
%environment
export LC_ALL=C
export PATH=/MetaxaQR/:$PATH
export PATH=/Nextflow/:$PATH
export PATH=/TrimGalore-0.6.10/:$PATH
export PATH=/Diamond/:$PATH
export PATH=/vsearch-2.25.0/:$PATH
export PATH=/cd-hit-v4.8.1-2019-0228/:$PATH
Let’s Get Started!
Now, it’s time for you to define processes and channels and connect them in a workflow. But first, you need to add a configuration file.
Where is the data?
The input fastq files for the pipeline can be found here /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/
.
Adding a Configuration File
A configuration file is essential for running your pipeline in Nextflow. While Nextflow allows extensive customization, we will only cover the basics in this workshop. Combine all the following code snippets into a single document named nextflow.config
.
Enable DSL 2
DSL2 is the latest version of the Nextflow language.
.enable.dsl=2 nextflow
Parameters
.files = ""
params.cluster_options = ""
params.container = "" params
This setup creates the following flags:
--files
: Specify the location of the FASTQ files.--cluster_options
: Enter specific options for the cluster. This should be assign the value"-A C3SE2023-2-17 -p vera"
.--container
: Define where the container is located.
These will become variables in your processes and workflows. You can name them as you like. Alternatively, assign values directly here, like params.container = "{PATH}/container.sif"
.
Profiles
Your configuration will vary depending on whether you’re running Nextflow from the command line or using an Sbatch script. Here, we define profiles to instruct Nextflow how to behave in local or Slurm environments. Later on you can use -profile 'local'
or -profile 'slurm'
to specify the desired environment from the command line or an sbatch script.
Note here that you only need one -
. This is because -profile
is a pre-defined Nextflow flag. All flags you define through params.
require two dashes --
. If you want to read more about how to configure the config file you can read about it here.
{
profiles {
local .cache = true
process.container = params.container
process.executor = 'local'
process.enabled = true
apptainer.autoMounts = true
apptainer
}
{
slurm .cache = true
process.container = params.container
process.executor = 'slurm'
process.clusterOptions = params.cluster_options
process.scratch = true
process.enabled = true
apptainer.autoMounts = true
apptainer}
}
Start Building the Pipeline
- Save the config file in the same directory and name it
nextflow.config
. Nextflow will automatically find this file. If the file is located elsewhere, specify its location using the-c
flag. - Begin by creating processes for the tools (Section 6.1) and connect them using a workflow (Section 6). The file containing the processes and workflow should have the
.nf
extension and be calledmain.nf
. - Use the
publishDir
directive to tell Nextflow where to store the pipeline’s output. More about this can be found here. - Test locally with a single file using an interactive Slurm session. Find more information about
nextflow run
here.
If you haven’t specified the values of params.container
and params.files
in the config file, you’ll need to include them in the command line.
On Vera, you can either submit a non-interactive sbatch job to run in the background, or start an interactive session like this (replace {NR_CPU}
with the number of CPUs needed and the estimated duration in HH:MM:SS.
Change -t
srun -A C3SE2023-2-17 --nodes 1 --ntasks 1 --cpus-per-task 10 -t 00:00:00 --pty bash -is
Nextflow is preinstalled on the cluster. All you have to do is to load it.
module load Nextflow/23.10.0
To check that it loaded correctly you type:
nextflow -h
If you got the help output everything worked as expected!
You run the pipeline by executing this command on the command line.
nextflow run main.nf -profile 'local' -resume
The -resume
flag enables Nextflow to start the pipeline from where you last left off. Nextflow automatically caches runs, but it does not automatically resume them unless you use this flag.
Once your pipeline works correctly with one file, try running it with all the FASTQ files in /cephyr/NOBACKUP/groups/bbt045_2024/turorial_Nextflow/data/
.
Adding Trace Report and Process Execution Time
Gaining detailed insights into the resource requirements of a process can be very valuable. Questions like how long a process ran, how much RAM it consumed, and whether it fully utilized the allocated CPU cores are crucial. This information can help you benchmark different tools to find the best fit for your needs based on available resources. Moreover, it’s likely you’ll use this information to specify the resource demand for each tool in the next step of this workshop. You can find more about what this report looks like and how to activate it here. Also, don’t forget to activate the timeline report function.
Adding Resource Requirements
Review the tracing
and timeline report
to determine the resource demands of each process. Then, specify the time, cpus, and memory each process needs.
When you specify cpus
, you tell Nextflow that the process is supposed to use x number of cpus. You also need to tell the tool the number of cpus it is supposed to use. That is usually done via a flag. When you specify cpus
the value you assing it wil lalso be saved in a varible called ${task.cpus}
. Call this varible after the cpu allocation flag for each script that needs it to automatically assign the right number of cpus.
Compare the timeline report before and after you added the resource requirements.
Why is this important? No matter where you run your processes (locally or on a cluster), there will be resource limitations. By telling Nextflow the amount of resources each process requires, it can optimize resource utilization through parallelization.
Submitting a Job Using Slurm
Here’s an sbatch script you’ll need to modify slightly. Replace {NAME}
with a name for your run (the specific name isn’t crucial). Then, indicate the expected duration of the run after the -t
flag (in HH:MM:SS). Remember, the time specified here is for the entire pipeline, not individual processes. Also, replace {PATH}
with where you’d like the stdout and stderr output to go. This will help you monitor the pipeline’s progress and catch any errors.
This sbatch job will submit a separate sbatch job for each process.
#!/usr/bin/env bash
#SBATCH -A C3SE2023-2-17 -p vera
#SBATCH -J {NAME}
#SBATCH -c 1
#SBATCH -t 00:00:00
#SBATCH --error={PATH}/job.%J.err
#SBATCH --output={PATH}/job.%J.out
# Unload unwanted packages and load Nextflow
module purge
module load Nextflow/23.10.0
# Add flags if relevant
nextflow run main.nf -profile 'slurm' -resume
You save this script {NAME}.sh
. To submit the job you enter sbatch {NAME}.sh
. To monitor how the job is going you can either open the {PATH}/job.%J.err
or {PATH}/job.%J.out
files. If you want a list of all your submitted jobs use this comand squeue -u {your_username}
. If you want to get inforamtion about a specific job use squeue -j {job_id}
.
For this job I want you to analyze all fastq files in the directory. When it has finished take a look at the time line report. Can you now see the value of using Nextflow to parallelize your processes?