Topic
  • 3 replies
  • Latest Post - ‏2013-02-28T13:59:27Z by SystemAdmin
SystemAdmin
SystemAdmin
1245 Posts

Pinned topic Tip - Jump Start Streams Text Analytics using the Text toolkit

‏2012-02-02T17:21:13Z |
Streams 2.0.0.3 includes the Text Toolkit for extracting structured information from unstructured text. Below is an example using the
Internet and Text toolkits to jump start your thinking on how Streams and the toolkits could be used.

The code included below monitors Craigslist for bikes and bike parts being sold in two regions of California. It extracts details from
internet postings and streams them into tuples that are processed.

The processing steps....
  • Pulls Craigslist RSS feeds for San Francisco and San Diego bike listing using the Internet Toolkit. The data is pulled every 5 minutes.
  • Shreds the RSS document using Annotation Query Language (AQL) the Text Toolkit's language for building extractors.
  • extract metals: Titanium(ti), Steel, Aluminum... - using AQL
  • extract components : chain, wheel, seat .... - using AQL
  • extract the price - using AQL
  • Returns extracted data as tuples to Streams.
  • Splits the results of AQL processing based upon 'rating'.

The attached file (SqlAqlDraw.jpg) a hybrid diagram depicting the SQL/AQL processing.

The application is composed of two files, which I've included below : AnaRSS.spl and AnaRSS.aql.

AnaRSS.spl : source


namespace application; use ibm.streams.text.analytics::TextExtract ; use com.ibm.streams.inet::InetSource ; composite AnaRSS 
{ type OneItem = tuple<rstring allLines> ; ItemSale = tuple<rstring id, rstring region, rstring pman, rstring man, int32 rating, rstring price, rstring summary, rstring body>; Material = tuple<rstring id, rstring substance>; graph 
/* Get data from Criags list ever 10 min. */ stream<list<rstring> lines> RssPage = InetSource() 
{ param URIList : [ 
"http://sandiego.craigslist.org/bik/index.rss", 
"http://sfbay.craigslist.org/bik/index.rss"]; incrementalFetch : 

true ; fetchInterval : 600.0 ; emitTuplePerURI : 

true ; 
} stream<OneItem> AnItem = Custom(RssPage) 
{ 
/*  RSS is XML consisting of a collection of '<item>'s, burst them  out *  push to text analytics on at a time */ logic state :        
{ mutable OneItem item = 
{ allLines = 
"" 
} ; mutable int32 idx = 0 ; 
} onTuple RssPage : 
{ 

for(rstring line in lines) 
{ 

if(findFirst(line, 
"<item ", 0) >= 0) 
{ 
//' ' (space) is IMPORTANT item.allLines = 
"" ; 
} item.allLines = item.allLines + line + 
"\n"; 

if(findFirst(line, 
"</item>", 0) >= 0) 
{ submit(item, AnItem) ; item.allLines = 
"" ; 
} 
} 
} 
} (stream <ItemSale> Sale;stream <Material>material) = com.ibm.streams.text.analytics::TextExtract(AnItem) 
{ param AQLFile : 
"AnaRSS.aql" ; 
// found <project>/Data directory outputViews : 
"Sale", 
"Material" ; 
} (stream <ItemSale>deDupSinkSale)= DeDuplicate(Sale) 
{ 
/* Deduplicate based upon time. */ param timeOut : 60000.00; key : id; 
}   (stream<Sale>RateOne; stream<Sale>RateTwo; stream<Sale>RateThr) = Split(deDupSinkSale) 
{ param index : rating; 
} () as BikeMaterial = FileSink(material) 
{param file : 
"/tmp/BikeMaterial.txt";flush : 1u; format : txt;
} () as Rate1 = FileSink(RateOne) 
{param file : 
"/tmp/Rate1.txt";flush : 1u; format : txt;
} () as Rate2 = FileSink(RateTwo) 
{param file : 
"/tmp/Rate2.txt";flush : 1u; format : txt;
} () as Rate3 = FileSink(RateThr) 
{param file : 
"/tmp/Rate3.txt";flush : 1u; format : txt;
} 
}

AnaRSS.aql : source

The AQL file : put this is <project>/Data directory


/* AnaRSS.aql *    Shredding CraigsList RSS feed.... */ --- Detag :aka shred <title>, <dc:source> ... <decription> detag Document.text as DetaggedDoc annotate element 
'title' as TitleElement, element 
'dc:source' as SourceElement, element 
'description' as DescriptionElement;   create view Summary as select title.match from TitleElement title; create view Description as select desc.match from DescriptionElement desc; create view Source as select src.match from SourceElement src; 
/* *   Create dictionaries of signficant/interesting words. */ --- For Material create dictionary MaterialDict with 

case insensitive  as (
'aluminum', 
'steel', 
'ti',
'titanium', 
'carbon', 
'fiber'); create view MaterialType as extract dictionary 
'MaterialDict' on S.match as match from Description S;   --- For Component, create dictionary ComponentDict with 

case insensitive as (
'frame',
'fork',
'shock',
'headset',
'crankset',
'derailleur',
'shifters',
'cassette',
'chain',
'wheels', 
'while' , 
'tire',
'tires',
'brake',
'brakes',
'handlebar',
'stem',
'grips',
'grip',
'saddle',
'seatpost', 
'bike', 
'cruiser', 
'bicycle'); create view Component as extract dictionary 
'ComponentDict' on S.match as match from Summary S;   --- Derive a Manufacture and a rating from the manu name and compensate 

for bad spellers (like me). create table ManufactureMap(man Text, rate Integer, realMan Text) as values (
'giant',3, 
'giant'), (
'shimano', 3,
'shimano'), (
'felt', 3, 
'felt'),(
'campi', 1, 
'campy'), (
'campy',1, 
'campy'), (
'campagnolo',1, 
'campy'),(
'cinelli',1, 
'cinelli'), (
'schwin', 2, 
'schwinn'), (
'schwinn', 2, 
'schwinn'), (
'waterford',2, 
'waterford'), (
'merlin', 2, 
'merlin'), (
'haro', 2, 
'haro'), (
'raleigh', 1, 
'raleigh'), (
'cannondale', 2, 
'cannondale'),(
'gt', 2, 
'gt'),(
'jamis', 2, 
'jamis'), (
'lemond', 3, 
'trek'), (
'trek', 3, 
'trek'), (
'fisher', 2, 
'trek'), (
'huffy', 2, 
'sears'), (
'bontrager', 3, 
'trek'), (
'specialized', 2, 
'specialized'), (
'bianchi', 1, 
'bianchi'); create dictionary ManufactureDict from table ManufactureMap with entries from man; create view Manufacture as extract dictionary 
'ManufactureDict' on S.match as match from Summary S; create view ManClean as select MM.rate as rate, ToLowerCase(M.match) as postMan, MM.realMan from ManufactureMap MM, Manufacture M where Equals(GetText(MM.man), ToLowerCase(M.match)); 
/* * Extract with regular expressions. */ create view Post as --- extract : region, id extract regex /.*\/(\w+)\..*\/(\d+).*/ on R.match 

return group 0 as url and group 1 as region and group 2 as id from Source R;   create view Price as  --- extract : price extract regex /\$[ ]
{0,2
}(\d+)/ on S.match 

return group 0 as summary and group 1 as cost from Summary S; 
/* *    Output / Exported to Streams */ create view Sale as select I.id as id, I.region as region, M.postMan as pman, M.realMan as man, M.rate as rating, P.cost as price, S.match as summary,SRC.match as URL, DESC.match as body from Post I, ManClean M, Price P, Summary S, Source SRC, Description DESC; output view Sale; create view Material as select I.id as id, ToLowerCase(Mat.match) as substance from Post I, MaterialType Mat; output view Material;


Resources


Notes

  • You will need to 'Add Toolkit Location...' using the 'Streams Explorer' view for the Text and Internet toolkits.
  • I'll attach a project that can imported into Eclipse as well as PDF document illustrating the steps to necessary to import and run.
  • SystemAdmin
    SystemAdmin
    1245 Posts

    Re: Tip - Jump Start Streams Text Analytics using the Text toolkit

    ‏2012-02-02T17:57:05Z  
    Importable Streams eclipse project with AnaRSS.pdf illustrating steps.

    To decompress...
    
    > tar -zxvf AnaRSS.tgz
    

    Attachments

  • bmwilli
    bmwilli
    41 Posts

    Re: Tip - Jump Start Streams Text Analytics using the Text toolkit

    ‏2012-02-02T20:03:21Z  
    Great example!! Thanks.
  • SystemAdmin
    SystemAdmin
    1245 Posts

    Re: Tip - Jump Start Streams Text Analytics using the Text toolkit

    ‏2013-02-28T13:59:27Z  
    To make it work with Streams 3.0

    1. AnaRSS.spl:

    (stream <ItemSale> Sale;stream <Material>material) =
    com.ibm.streams.text.analytics::TextExtract(AnItem)
    {
    param
    AQLFile : "AnaRSS.aql" ; // found <project>/Data directory
    outputViews : "Sale", "Material" ;
    }
    to

    (stream <ItemSale> Sale;stream <Material>material) =
    com.ibm.streams.text.analytics::TextExtract(AnItem)
    {
    param
    uncompiledModules : "AnaRSS" ; // found <project>/Data directory
    outputViews : "AnaRSS.Sale", "AnaRSS.Material" ;
    outputMode:"multiPort";
    }

    2. AnaRSS.aql

    Added a new line to specify the module:

    module AnaRSS;

    • Detag :aka shred <title>, <dc:source> ... <decription>
    detag Document.text as DetaggedDoc
    annotate
    element 'title' as TitleElement,
    element 'dc:source' as SourceElement,
    element 'description' as DescriptionElement;
    ....

    3. Location of AnaRSS.aql
    data/AnaRSS