In this post, we will create a cluster of NiFi instances. NiFi uses the Zero-Master Clustering paradigm where each node performs the same tasks on a different set of data. More in details, when some a NiFi’s Instances is in cluster mode, one of them is elected as Cluster Coordinator (via Apache ZooKeeper). This node is responsible for connection and disconnection of other nodes. The other nodes wait until a new FlowFile is sent in a particular Input Port, and then complete the computation.
Let’s enter inside NiFi’s Terminology. From HortonWork Documentation:
Nodes: Each cluster is made up of one or more nodes. The nodes do data processing.
NiFi Cluster Coordinator: a node in a NiFi cluster that is responsible for carrying out tasks to manage which nodes are allowed in the cluster and providing the most up-to-date flow to newly joining nodes. When a DataFlow Manager manages a dataflow in a cluster, they are able to do so through the User Interface of any node in the cluster. Any change made is then replicated to all nodes in the cluster.
Primary Node: Every cluster has one Primary Node. On this node, it is possible to run “Isolated Processors” Nifi uses ZooKeeper for electing the Primary Node. If that node disconnects from the cluster for any reason, a new Primary Node will automatically be elected. Users can determine which node is currently elected as the Primary Node by looking at the Cluster Management page of the User Interface.
Flow Election: When a cluster first starts up, NiFi must determine which of the nodes have the “correct” version of the flow. This is done by voting on the flows that each of the nodes has. When a node attempts to connect to a cluster, it provides a copy of its local flow to the Cluster Coordinator. If no flow has yet been elected the “correct” flow, the node’s flow is compared to each of the other Nodes’ flows. If another Node’s flow matches this one, a vote is cast for this flow.
For this experiment, we will use some existing Docker images:
- Nifi : apache/nifi:latest
- Zookeeper: bitnami/zookeeper:latest
docker-compose up --scale nifi=<Number_of_nodes> -d
In our tests, we will use an environment with 3 instances:
docker-compose up --scale nifi=3 -d
After a while, we can see the cluster in preparation by simply opening the URL http://localhost:32770/nifi (I suggest to use Kitematic for this information).
At this moment, the cluster is voting for the official FlowFile and Cluster Coordinator. This operation will take some minutes (It depends on the configuration), but in the end, we can see NiFi’s cluster up.
From NIFI’s menu, you can see a new voice called Cluster, which contains the following:
Now, let’s go a bit more in deep, in the configuration. From the docker-compose.yml file, we can see the NiFi’s Cluster properties:
NIFI_WEB_HTTP_PORT=8080 NIFI_CLUSTER_IS_NODE=true NIFI_CLUSTER_NODE_PROTOCOL_PORT=8082 NIFI_ZK_CONNECT_STRING=zookeeper:2181 NIFI_ELECTION_MAX_WAIT=1 min
- nifi.cluster.is.node = true – means that we are creating a cluster
- nifi.zookeeper.connect.string – The Connect String is the comma-separated list of the hostnames of Apache ZooKeeper instances (for us is just zookeeper:2181).
For a more detailed discussion of the NiFi’s properties, you can refer to HortonWorks web site.
If you open the NiFi’s log, you can see the election of the cluster coordinator, the moment where a node “enter” in the cluster, and the Heartbeat that is sent every 5 seconds:
2019-03-02 16:45:25,156 INFO [Clustering Tasks Thread-2] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2019-03-02 16:45:25,144 and sent to 14476ee0dea9:8082 at 2019-03-02 16:45:25,156; send took 11 millis 2019-03-02 16:45:30,204 INFO [Clustering Tasks Thread-3] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2019-03-02 16:45:30,158 and sent to 14476ee0dea9:8082 at 2019-03-02 16:45:30,204; send took 44 millis 2019-03-02 16:45:35,222 INFO [Clustering Tasks Thread-1] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2019-03-02 16:45:35,208 and sent to 14476ee0dea9:8082 at 2019-03-02 16:45:35,222; send took 13 millis 2019-03-02 16:45:40,232 INFO [Clustering Tasks Thread-2] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2019-03-02 16:45:40,223 and sent to 14476ee0dea9:8082 at 2019-03-02 16:45:40,232; send took 9 millis
Play with NiFi’s cluster
Ok, we have a NiFi cluster with 3 nodes and the second is both Primary and Coordinator. Now, let’s create a very simple template, where a huge number of files is created from the Primary Node and each file is saved locally in one node of the cluster. For doing this, we have to use the RPG (Remote Processor Group) of NiFi. The Remote Process Groups looks like as normal Process Groups, but it references a remote instance of NiFi. In fact, It allows you to transfer data from a node (usually the Primary) to the cluster in order to balance the processing.
More in deep, when data is transferred to the cluster, the RPG checks the connection’s URL to determinate the cluster’s state and the number of nodes. This information is used to balance the data that is pushed to a remote node. You can configure the RPG in order to balance the loan in a base of size, number of FlowFile, and duration.
Creating the template
The template is the following as usual, you can download the template here:
From left to right and top to down, we have the following processors:
- A GenerateFlowFile for simulating the creation of a huge number of FlowFile with a custom text
- The Remote Processor Group (RPG) for balancing the computation
- An input port, called Files, where the data from the RPG are sent
- A PutFile for storing locally the data from RPG on to the FileSystem
First, let’s start with the configuration of the processor GenerateFlowFile. The image below shows its properties:
Then, we have to configure, in the scheduling tab, that only the primary node will generate the FlowFile.
But why that? This is done for having just only one node that creates data. This approach is the same if you want to get data from a File System, Database or others. Now, let’s see the configuration of the RPG.
First, you have to set at least the URL of one node inside the NiFi cluster (you can put the whole list separated by comma). Next, you have to choose the input port, where the FlowFile will send from the Primary Node to the others. You can do this from the RPG menu called Manage report ports.
Now, clicking on the pencil icon associated with the port Files, we can set the balancing properties:
In our demo, we will set 3 as concurrent tasks and that the RPG will change the remote node every 10 FlowFile sent (Batch Setting count). This is the level of our balancing. The RPG will choose a node and will send 10 FlowFile. After the RPG will choose another one. This produces a not perfect balance but it is less onerous for NiFI. You can have the perfect balance by setting 1.
Finally, you have to enable the input port:
We are near to the end, now let’s configure the PutFile processor. The configuration is very simple, just set the output folder where the file will write (as shown on the image below):
Remote Processor Group in action
Everything is ok, let’s start every processor, enable the RPG, and wait for a few seconds. After, I suggest you, to stop the GenerateFlowFile processor for avoiding the creation of millions of files.
After some minutes, you connect to one NiFi’s node, you can see the list of the processed FlowFile:
Well, it seems work, but how NiFi has balanced the FlowFiles? From the images below, the RPG automatically distribute files among the 3 nodes.
In particular, the first node has managed more FlowFile, while the other two have processed the same number. This happens because we have set the number of 10 FlowFile in the RPG balance. However, the loan is quite good distributes among the three nodes.
That’s all for now. It would seem that we have not to do anything interesting, but it simply to image how many possibilities this technique can open. We have, out of the box, a simple way for distributing processing of files among different machines using the common NiFi processors. Moreover, we can also balance processing in different ways. Of course, there are a lot of details about the configuration for a real production environment that I choose to discard for keeping this post simple. But, the Internet is your friend, and you can start from the following URL here, here, and here.