Post data to NIFI using PowerShell, or more broadly, using powershell to post data to a URL using Rest.

Today I wanted to display how using PowerShell I can iterate through a directory and send the files in it to a NIFI instance for further processing.

I wanted to use Windows as my Operating Example and create a simple PowerShell script to accomplish this using the Invoke-RestMethod of PowerShell.

I also wanted to send headers on my post message so that I can use them within my NIFI instance to make routing and processing decisions.

For instance, I wanted to assign my data to a site ID and a FEED name. I can send those attributes within my post message and then I can build a data flow pipeline that can send data from X site ID’s down one data flow path, say HADOOP ( NIFI DOC ).

Below is my PowerShell script.

For latest copy please get it from GITHUB <<<<

Here is an example

$cert =”c:path\to\cert\file.cer”
$siteId = “020f9730-4f3e-440a-bad6-a3e2654250f4”
$feed=”logs
$uri = “URL/STRING”
$dataDirectory = “c:\path\to\your\directory”
$files = Get-ChildItem -Path $dataDirectory
foreach ($file in $files)
{
Invoke-RestMethod -uri $uri -Method POST -Header @{“site.id”=”$siteId”;”filename”=”$file”;”feed”=”$feed”} -Infile “$dataDirectory\$file”
} -Certificate $cert

Once the data is posted on NIFI it will retain the headers I passed through my post message represented as http.header.{header} as NIFI attributes on the FlowFile.

The documentation is found here: HandleHttpRequest

EXAMPLE:

The power of the “advanced” button on NIFI UpdateAttribute processor.

The UpdateAttibute processor is used to manipulate NIFI attributes. [1]

In its basic form, you can add attributes from within the properties of the processor. Every FlowFile that goes through the processor will get updated with what you’ve configured in it.

For instance below:

UpdateAttributeEX1

Within the properties of the processor UpdateAttribute I’ve configured him to enrich all FlowFiles that go through it to now contain 2 attributes (MyAttribute1 / MyAttribute2) with their respective values.

I can use those attributes anywhere that accepts regular expression.

regularExpressionAccept

It is usually shown when hovering over a question mark within a processor’s property. We can also find out if a processor property accepts regular expression from the Apache NIFI online documentation [2] or from within your NIFI install top right.

HelpExample

Conditional attribute enhancement [IF,IF,…ELSE]

Let’s take a use case in which we are getting data from lots of sources and depending on where they are coming from we are going to do different things with them or set required settings for further upstream processing.

For this example, we are going to get data from EDGE site 1 / EDGE site 2 / EDGE site 3. We will tag all the feeds with an identifier of NIFI.FEED based on a condition and all FlowFiles with an identifier to tell us what NIFI host processed the data and further:

  • For EDGE site 1 we want to grab the data, binary merge and further, we are going to send it to Hadoop saving it to directory tree /data/${FEED}/YYYY-MM-dd-hh-mm/${filename}
  • For EDGE site 2 we want to enrich so we send it to local disk and change the filename to YYYY-MM-dd-hh-mm-$OriginalFileName-$UUID
  • For EDGE site 3 we want to SFTP the data

We can set the framework to route these FlowFile’s based on conditions “IF statements” in the ADVANCED section of our UpdateAttribute.

Like so:

EDGE DEVICE 1

Condition1

EDGE DEVICE 2

Condition2

EDGE DEVICE 3

Condition3

Now the FlowFile’s that route through this UpdateAtrribute processor that meet true for the conditions set on the condition section of the advanced tab, will be enriched with the attributes set on the actions section of the advanced tab on the UpdateAttribute processor.

Note that at the very end if no condition set anything for the parent attribute set on the parent level of the UpdateAttribute it will globally set that attribute, in this case with set NIFI.HOST to ${hostname(true)}

We can then use the RouteOnAttribute processor to make routing decisions, discussed on another blog in the future.

Note also that we have a routing strategy. CLONE or ORIGINAL

CLONE will make duplicate FlowFile’s that meet true for the conditions set.

ORIGINAL will keep one FlowFile iterating through the list of conditions. If the actions set same attributes, the last one wins and so order matters and you can drag your rules around.

Offical advanced documentation found here. [3]