javascript - rx js aggregation for posts -


i'm trying understand functional programming rx js.

i have rx.observable emits "post" objects:

every post looks :

{ title: "sometitle", author: "someauthor" text: "sometext", date: "somedate", tags: ['tag1', 'tag2', ..., 'tagn']  } 

and want transform sequence sequence emits:

{  tag: 'tagname', postcount: n } 

this have far:

function tags(post) {         return post             .tags             .map(function(tag) { return { 'tag': tag, 'count': 1});      }  posts   .flatmap(tags)   .groupby(function(tagged) { return tagged.tag })    . // don't know how continue  

as said before, goal create sequence/observable emits {tag: 'tagname', postcount: n } each tag

thx in advance

edit:

i forgot mention looking "node oriented" answer.

this have far. it works, i'm not sure { ..., count: 1 } part. i'm looking more "elegant" solution.

posts     .flatmap(tags)     .map((tag) => {return {name: tag, count: 1}})     .groupby((tagcount) => {return tagcount.name})     .flatmap((taggroup) => {return taggroup.reduce((a,x) => {return {tag: x.name, count: (a.count + x.count)}})}) 

it this:

// sequesnce of posts sequence 10ms interval  var posts = rx.observable    .fromarray([      { tags: ['tag1', 'tag2'] },      { tags: ['tag1', 'tag3'] },      { tags: ['tag1'] },      { tags: ['tag1', 'tag2', 'tag3'] }    ])    .zip(rx.observable.interval(10), rx.helpers.identity)    .do(logger('post:'));    // sequence of post counts tags, , count changes  var tagscountchanges = posts.scan(    function (acc, post) {      var counts = acc.counts;      var changes = [];      post.tags.foreach(function (tag) {        counts[tag] = (counts[tag] || 0) + 1;        changes.push({ tag: tag, postscount: counts[tag] });      });      return { counts, changes };    }, { counts: {}, changes: [] })    .map(acc => acc.changes)    .do(logger('tagscountchanges:'));    var tagcountupdates = tagscountchanges    .concatmap(function (changes) {      return rx.observable        .fromarray(changes);    });    tagcountupdates    .foreach(logger('tagpostcounts:'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.js"></script>  <pre id="log"></pre>  <script>    var log = document.getelementbyid('log');      function logger(label) {      return function(item) {        log.appendchild(document.createtextnode(label + ' ' + json.stringify(item, null, 2) + '\n'));      };    }  </script>

update (in response edit1):

it work in node too:) can drop logger , interval posts sequence - show nice log of items intermediate observables while running snippet in browser.

i'm not sure { ..., count: 1 } part. i'm looking more "elegant" solution.

actually can drop { ..., count: 1 } part entirely:

posts     .flatmap(post => post.tags)     .groupby(rx.helpers.identity)     .flatmap(taggroup$ =>         taggroup$.reduce((acc,tag) => {return {tag, count: acc.count+1}}, {count:0})     ) 

about elegance: solution - consider more expressive, , simpler my. solution more performant @ larger tag counts(because not create internal observable each tag).

also solution different - emit stream of tag counts changes, not final counts (after posts stream completion).

you solution can modified achieve same result - replace reduce scan.

and visa versa - if total counts required, solution can simplified lot:

posts.reduce(   (counts, post) => {     post.tags.foreach(tag => {       counts[tag] = (counts[tag] || 0) + 1;     });     return counts;   }, {})   .flatmap(counts =>       object.keys(counts).map(         tag => ({tag, count: counts[tag]})      )   ) 

Comments

Popular posts from this blog

Django REST Framework perform_create: You cannot call `.save()` after accessing `serializer.data` -

Why does Go error when trying to marshal this JSON? -