How to write a Filter ( using DB ) with Highland.js -


i'm trying design workflow using highland.js. have not able figure out how highland.js can used it.

i have stream based workflow below (pseudo code),

read                      //fs.createreadstream(...)    .pipe(parse)           //jsonstream.parse(...)    .pipe(filterduplicate) //mongoclient.db.collection.count({}) > 0    .pipe(transform)       //fn(item) { return tranform(item); }    .pipe(write);          //mongoclient.db.collection.insert(doc) 

the filterduplicate looks database check if read record exists (using condition) , returns boolean result. filter work, needs active db connection, want reuse till stream complete. 1 way have open connection before read , close on 'finish' event of write; means need pass connection param filter , write, work if both methods use same database.

in above workflow, filterduplicate , write may use different databases. expect connection contained , managed with-in each function, makes self-contained reusable unit.

i'm looking inputs on how can designed using highland.

thanks.

it's not going quite easy using pipe bunch of times. you've got use appropriate api method task.

here's rough example of you're going end close to:

read   .through(jsonstream.parse([true]))   .through((x) => {     h((next, push) => { // use generator async operations       h.wrapcallback( mongocountquery )( params ) // don't have way         .collect()         .tocallback((err, result) => {           if ( result > 0 ) push( err, x ); // if met criteria, hold onto           return push( null, h.nil ); // tell highland stream done         });     });   })   .merge() // because you've got stream of streams after `through`   .map(transform) // standard map through transform   .through((x) => {     h((next, push) => { // generator async operations       h.wrapcallback( mongoupdatequery )( params )         .tocallback((err, results) => {           push( err, results );           return push( null, h.nil );         });     });   })   .merge() // stream-of-streams situation   .tocallback( cb ); // call home we're done 

Comments

Popular posts from this blog

html - Styling progress bar with inline style -

java - Oracle Sql developer error: could not install some modules -

How to use autoclose brackets in Jupyter notebook? -