in this post, I will introduce a very useful tool for pipeline management called Apache NiFi.
It was 2004 and, at NSA (National Security Agency), three talented engineers build a new platform for handling, processing and distribute data by creating different pipelines. The platform was designed to be extendible by adding new data sources, or elaboration tools, called processors, and a GUI for making it simple to use. After ten years of hard work, in November 2014, this platform was donated to the Apache Software Foundation. This is in short the history of Apache Nifi.
For understanding how to use Apache NIFI, it is important understanding some concepts.
The first one is the role of FlowFiles, the heart of Apache NiFi. FlowFile consists of content and its metadata. While the content is simply the data, the payload of a file, used for the computation, the metadata is a list of attributes (key/value pairs). The FlowFiles are elaborate from different block called processors. Moreover, Apache NIFI stores information about the history of the FlowFile in a record called Provenance. The FlowFiles are persisted inside three different local folders, called repositories. These repositories are:
- FlowFile Repository: which contains metadata for all the current FlowFiles in the flow
- Content Repository: which holds the content for current and past FlowFiles
- Provenance Repository: which holds the history of processors which has elaborated the FlowFile
The data and eventually metadata, inside the repositories, are computed by one processor and then pass to other processors. A relationship is an Apache NIFI’s concept, It defines the routes to which a FlowFile may be transferred. A processor has one or more relationships, typical examples are success, failure, matched. Of course, the above concepts don’t describe in depth Nifi, more information are available here.
After this brief introduction, let’s create a very simple example pipeline with the following functions:
- Crawling data from Twitter using a hashtag
- Save the whole tweet in a directory called raw
- Extract the tweet’s text
- Save the tweet’s text in a directory called contents
- Save eventually malformed tweets in a folder called malformed
First of all, you need a Twitter Application, please refers to this link for creating one. It is out of the scope of this post to describe how to configure Apache NIFI, it is a complex work and require experiences. However, for this demo the default configuration of NIFI downloaded from this docker image is good enough.
As described above, Apache NIFI comes with different processors, these are the block of our pipeline. In this little demo, I will use these following ones:
- GetTwitter: Processor able to download tweets using user’s ids or terms
- EvaluateJsonPath: Processor able to extract one or more JsonPath expressions against the content of a FlowFile.
- PutFile: Processor able to write contents of a FlowFile to the local file system
Let’s start with the high-level screen-shot of our pipeline inside Apache NIFI GUI.
From left to right, we can see the processor “Download Tweets” which is an instance of GetTweets configured for retrieving tweets using term “trump”. Using this processor, each tweet will be seen as a single FlowFile. The next picture shows how to configure this processor.
A common best practice in Data Management suggests storing the raw data. We can use the PutFile processor for storing in a local folder with the path “/home/nifi/raw”.
Now, we can perform the extraction of the text from the tweet. We can accomplish this using the processor EvaluateJsonPath. The configuration is very simple, you have to add the property content with the value $.text using the button “+”.
With the above configuration, the processor EvaluateJsonPath will extract from the tweet. The processor has three different behaviours:
- The tweet has the field text
- The tweet has NOT the field text
- Some issues happen during the elaboration
In the first case a new FlowFile the text content will be created. This flowfile will pass to the processor PutFile, called “Save tweet content”, via the relation matched. This last processor will save the flowfile in the folder contents.
In the second case, which represents the case of a malformed tweet, the original FlowFile will route via a relation unmatched and save in the folder malformed by the processor PutFile called “Save Tweets with error”. In the last case, the less frequently one, the pipeline behaviour is the same, but the EvaluateJsonPath processor will use the route failure.
For now, that’s all, you can download the template from my GitHub at this link.