How to write a Filter ( using DB ) with Highland.js -


i'm trying design workflow using highland.js. have not able figure out how highland.js can used it.

i have stream based workflow below (pseudo code),

read                      //fs.createreadstream(...)    .pipe(parse)           //jsonstream.parse(...)    .pipe(filterduplicate) //mongoclient.db.collection.count({}) > 0    .pipe(transform)       //fn(item) { return tranform(item); }    .pipe(write);          //mongoclient.db.collection.insert(doc) 

the filterduplicate looks database check if read record exists (using condition) , returns boolean result. filter work, needs active db connection, want reuse till stream complete. 1 way have open connection before read , close on 'finish' event of write; means need pass connection param filter , write, work if both methods use same database.

in above workflow, filterduplicate , write may use different databases. expect connection contained , managed with-in each function, makes self-contained reusable unit.

i'm looking inputs on how can designed using highland.

thanks.

it's not going quite easy using pipe bunch of times. you've got use appropriate api method task.

here's rough example of you're going end close to:

read   .through(jsonstream.parse([true]))   .through((x) => {     h((next, push) => { // use generator async operations       h.wrapcallback( mongocountquery )( params ) // don't have way         .collect()         .tocallback((err, result) => {           if ( result > 0 ) push( err, x ); // if met criteria, hold onto           return push( null, h.nil ); // tell highland stream done         });     });   })   .merge() // because you've got stream of streams after `through`   .map(transform) // standard map through transform   .through((x) => {     h((next, push) => { // generator async operations       h.wrapcallback( mongoupdatequery )( params )         .tocallback((err, results) => {           push( err, results );           return push( null, h.nil );         });     });   })   .merge() // stream-of-streams situation   .tocallback( cb ); // call home we're done 

Comments

Popular posts from this blog

ios - Memory not freeing up after popping viewcontroller using ARC -

Django REST Framework perform_create: You cannot call `.save()` after accessing `serializer.data` -

Why does Go error when trying to marshal this JSON? -