Demystifying IBM Streams

Home » Demystifying IBM Streams

The aim of this blog is to help you build a picture of how one could develop SPL programs in IBM streams. In this blog, you will learn about —

  • Data Streaming and the need for real-time decision systems.
  • IBM Streams and its toolkits.
  • Some SPL code.

Let us take a deep dive into the ocean of IBM Streams.

50 m Atlantis 1 Submarine — Streaming Analytics

Related image

Streaming Analytics, (also referred to as Stream Processing) is the method of processing huge data (Stream) in-flight. These data streams are the uninterrupted flow of an overlong sequence of data. The power of streaming analytics is such that it permits for the streaming of millions of events per sec and entitles us to build applications that recommend real-time decision making.

Streaming data might comprise of log files generated by e-commerce purchases, in-game player activity, information from social networks, financial stock market or even network logs from devices.

By the year 2021, the global IP traffic will reach 3.3 ZettaBytes (ZB) per year – Cisco

As hardware gets cheaper and devices get smarter the volume of data being sent and receive is dramatically growing. Faced with this growing volume of constantly changing data, today’s firms are challenged to make informedreal-time business decisions and stay ahead of the competition without the need to cache the data in Hadoop Systems.


300m Scuba Divers — What is IBM Streams?

IBM streams (IBM Infosphere Streams) is a proprietary tool which has a development platform as well as runtime environment that enables establishments to develop and execute applications that ingest, filter and analyze enormous volumes of data streams.

It uses its unique Streams Processing Language (SPL) and can be extended with C, C++ or Java applications to reuse existing logic. It consists of an Eclipse-based IDE, application graphs and runtime monitoring to simplify the process of building and managing applications.

Below is the simplest possible; our all-time favorite Hello World application —

namespace sample;

composite HelloWorld {
    graph 
        stream  Hello = Beacon() {
            param 
                iterations: 5u; 
            output 
                Hello: message = "Hello World!"; 
        } 
        
        () as Sink = Custom(Hello) { 
            logic 
                onTuple Hello: 
                    printStringLn(message); 
        }
}
Few takeaways —
  • Stream
    • A stream is a sequence of tuples and tuples are collections of attributes. (Ex. rstring message). A stream can be an infinite sequence of tuples, such as a stream from a network sensor.
    • Alternatively, a stream can be finite, such as a stream that is created from the contents of a file.
  • Tuple
    An individual piece of data in a stream. These data can be structured or unstructured.
  • Operator
    • These are the basic building blocks of stream processing application. They process tuples from an incoming stream and produces an output stream.
    • Operators process an input stream in one of the following logical modes.
      • As each tuple is received.
      • As a window.
    • This also manipulates the tuple data from the input stream and manufactures the results in the form of an output stream.

On successful compilation of this spl code, a .sab file is produced. A streams application bundle (sab) file is a single, relocatable file that contains all the artifacts that are needed to run your application. When this is submitted for execution, the application bundle file is then unbundled into a runtime application.


1000m Giant Pacific Octopus — Counting lines inside all Files of a Specified Directory using a pattern to match the file types.

use com.ibm.streams.teda.internal::DirScan ;

composite LineCounter {
	type
		LineCountType = rstring fileName, int32 lineCount ;
	graph
		(stream<rstring fileName> MyFileName ) = DirectoryScan() {
			param
				directory : getSubmissionTimeValue("Dir") ;
				ignoreDotFiles:true ;
				sleepTime:10.0 ;
				pattern: ".*\\.*" ; //This takes a regex 
		}

		(stream<rstring line, rstring fileName> Line) = FileSource(MyFileName) {
			param
				format:line ;
			
			output
				Line: fileName = FileName() ; 
		}

		(stream<LineCountType> LineCount ) = Custom(Line) {
			logic
				state: {
					mutable int32 _lineCnt = 0 ;
					mutable rstring _fileName = "" ;
				}
				
				onTuple Line : {
					_lineCnt++ ;
					_fileName = Line.fileName ;
				}
				
				onPunct Line: {
					if (currentPunct() == Sys.WindowMarker) {
						mutable LineCountType oTuple ={} ;
						oTuple.fileName =_fileName ;
						oTuple.lineCount = _lineCnt ;
						submit(oTuple, LineCount) ;
						_lineCnt=0 ;
						_fileName="" ;		
					}
				}
		}
		
		stream<LineCountType> outputStream = Throttle(LineCount){
		    param
		        rate: 2.0 ;
		}

		() as FileSink_4 = FileSink(outputStream){
			param
				file : "/dev/stdout" ;
				flush: 1u ;
		}
}

This uses a Throttle operator which outputs the resulting streams at an interval of 2s.  Typically, this operator is used to pace a stream to flow at a specified rate as required.


3800m Titanic Shipwreck —

The PacketLiveSource operator in the Network toolkit has a performance of nearly 1M packets per second and the DNSMessageParser can parse 600K packets per second per core as an independent operator.

It is insane how much horsepower this IBM stream has.


6000m MIR DSV — Network Toolkit

This toolkit permits us to analyze the low-level network packets such as parsing DHCP, DNS, Netflow, enriching IPV4 and IPV6 addresses with geospatial data.

stream<PacketType> PacketLiveStream as PacketsOut = PacketLiveSource(){
    param
        networkInterface : $networkInterface ;
        promiscuous : true ;
    output
        PacketsOut : captureTime =(float64)CAPTURE_SECONDS()+(float64)CAPTURE_MICROSECONDS()/ 1000000.0, ipSourceAddress = convertIPV4AddressNumericToString(IPV4_SRC_ADDRESS()), ipDestinationAddress = convertIPV4AddressNumericToString(IPV4_DST_ADDRESS()), dnsMessage = PAYLOAD_DATA();
}

PacketLiveSource operator is able to capture live network packets from an Ethernet interface.

stream<DNSMessageType> DNSMessageStream as Out = DNSMessageParser(PacketLiveStream){
    logic
        state : {
            map<uint16, rstring> dnsTypes = { 1 : "A", 2 : "NS", 3 : "MD", 4 : "MF", 5 : "CNAME", 6 : "SOA", 7 : "MB", 8 : "MG", 9 : "MR", 10 : "NULL", 11 : "WKS", 12 : "PTR", 13 : "HINFO", 14 : "MINFO", 15 : "MX", 16 : "TXT", 28 : "AAAA" } ;
            map<uint8, rstring> dnsResponseCodes = { 0 : "OK", 1 : "Format Error", 2 : "Server Failure", 3 : "Name Error", 4 : "[not implemented]", 5 : "Server Refused" } ;
        }

    param
        messageAttribute : dnsMessage ;
    output
        Out : captureTime = formatEpochTime(captureTime), isResponse = DNS_RESPONSE_FLAG(), responseCode = DNS_RESPONSE_CODE()in dnsResponseCodes ? dnsResponseCodes [ DNS_RESPONSE_CODE()] :(rstring)DNS_RESPONSE_CODE(), questionName = DNS_QUESTION_NAME(), questionType = DNS_QUESTION_TYPE()in dnsTypes ? dnsTypes [ DNS_QUESTION_TYPE()] :(rstring)DNS_QUESTION_TYPE(), answerNames = DNS_ANSWER_NAMES(), answerTTLs = DNS_ANSWER_TTLS(), answerData = DNS_ANSWER_DATA(), nameserverNames = DNS_NAMESERVER_NAMES();
}

This operator parses individual DNS message field received in input tuples and emits tuples containing DNS message data. Users can then use DNS parser result functions to further process the payload data.


7700m Deepest Water Fishes — Custom Operator

The Custom operator is a special logic-related operator that can receive and send any number of streams and does not do anything by itself. Thus, it offers a blank slate for customization. Let us see how we sort maps based on its values.

()as CustomSort = Custom(outputStream){
    logic
        state:{
            mutable map<rstring,int32> keyValueMap = {"e":10,"b":-24,"c":50,"a":-32,"d":230};
            mutable list<int32> valueList;
            mutable list<rstring> keyList = keys(keyValueMap);
            mutable int32 i = 0;
            mutable int32 j = 0;
            mutable int32 currentCount;
            mutable rstring currentKey;
        }

    onTuple outputStream :{
        mutable rstring tempKey;
        while(i < size(keyList)){
            tempKey = keyList[ i ];
            appendM(valueList, keyValueMap[ tempKey ]) ;
            i++;
        }
        i=1;
        while (i < size(valueList)){
            currentCount = valueList[ i ];
            currentKey = keyList[ i ];
            j = i-1;
            while (j >= 0 && valueList[ j ] < currentCount){
                valueList[ j+1 ] = valueList[ j ];
                keyList[ j+1 ] = keyList[ j ];
                j = j-1;
            }
            valueList[ j+1 ] = currentCount;
            keyList[ j+1 ] = currentKey;
            i ++;
            printStringLn("Sorted Values->"+ (rstring)valueList) ;
            printStringLn("Sorted Key->"+(rstring)keyList) ;
        }
        shutdownPE();
    }
}

shutdownPE() — Shutdown processing element (PE). This Stops the current PE. Whats PE? When you compile a stream processing application, the operator and streams relationships that make up the data flow graph are broken down into a set of individual execution units called PEs.


This is not the end of our deep-dive. This ocean offers a lot more. The journey to the deepest point underwater is still on. I’ll hopefully discover some lost pirate treasure before my next post but you Keep Streaming!

By | 2018-09-07T05:41:41+00:00 September 7th, 2018|Big Data|