I’ve been aware of transducers for a little while, but haven’t actually used them, or even really felt like I fully grokked what they were good for. They come from the clojure community, but are making their way into plenty of other languages and libraries too. I’ve seen claims that they are a game-changing, breathtaking new concept, which didn’t really square with what they looked like.

So I thought I’d learn more about them by just attempting some plausible but detailed examples with them in JavaScript. If you’ve heard about transducers but aren’t really sure what they’re good for, perhaps this’ll help clarify. And if you’ve never heard of transducers, feel free to take a detour via the clojure documentation.

tl;dr

After trying out transducers on a bunch of sample problems, my takeaway is that transducers are:

  • a neat encapsulation for the usual set of functional transformations (map, filter, reduce, etc)
  • a common way to represent a transformation to be done later, which leads to:
  • a way to apply multiple transformations without creating intermediate collections
  • a decent way to compose asynchronous transformations (compared to vanilla JavaScript efforts)
  • significantly more complex to understand than map, filter, reduce, etc.

These features are neat, but I wouldn’t call them ground-breaking. And I may be spoiled, but “the ability to use the same transformations on collections of different types (arrays, streams, etc)” isn’t terribly novel - Javascript is one of the worst high-level languages for dealing with things like lazy or blocking computation. Most other languages can already achieve the majority of these goals using some sort of lazy iterator / collection protocol and plain ol’ map, filter, and reduce.

To expand on the last point above about complexity - sometimes this won’t matter. If you just use t.map and friends, you can largely ignore the complexity as an implementation detail. But if you get into more complex transducers, you may well be baffled by their failure to do what you expect, and figuring out why can be difficult. This happened to me plenty of times when coming up with these examples, and they aren’t even that complex.

So what are transducers?

From what I’d read they seemed mostly like a way of representing map, filter and reduce style operations as an object which can be applied later. They do this by encapsulating three functions - init, step and result. To make matters harder to wrap your head around, transducers don’t directly do things to a collection - they wrap another transducer. Rather than go into the theory and implementation of individual transducers, I’m going to focus on how they’re used, and by extension when they are useful.

Here’s an example of a map transducer, which can be used to build a copy of a sequence with each element incremented by 1.

var addOneToEverything = transduce.map(function(x) { return x + 1; });
var result = t.into([], addOneToEverything, someSequence);

But that’s not a very useful example, because it doesn’t gain you anything. After all, it’s trivial to store and reuse a mapping function already:

var addOne = function(x) { return x + 1 };
var result = someSequence.map(addOne);

If you really don’t want to remember that addOne should be applied with map (and not to filter or reduce), you could always wrap it further:

var addOneToEverything = function(seq) {
  return seq.map(function(x) { return x + 1 });
}
var result = addOneToEverything(someSequence);

Another stated strength of transducers is that they compose easily - you can build a chained transducer with:

var chain = transduce.compose(addOneToEverything, filterOutEvenNumbers);

..but I can compose functions, too:

var result = filterOutEvenNumbers(addOneToEverything(items));

There are definite downsides to this approach:

  • it builds intermediate arrays - if you apply multiple transformations you’ll be building an intermediate array for each one
  • it only works for synchronous transformations - you need an entirely different API to deal with async map functions

I’ve also heard that a big benefit of transducers is that they don’t have to know what kind of sequence you’re dealing with. I don’t really buy that, since this is all just duck typing - whatever sequence type you have, as long as it has a map function, you’re set. Whether it’s an object, or an array, or some custom iterator type, addOneToEverything doesn’t know or care.

The interesting thing is that the above downsides are really just limitations of the JavaScript language. I don’t know enough about clojure to know whether they are big issues there, but I do know plenty about StratifiedJS (I helped build it!) to know that it has neither of those problems.

Specifically, in StratifiedJS any expression can “suspend”, and the runtime will not evaluate something depending on that expression until its value is “ready”. It’s as if the language were smart enough to evaluate every single Promise expression automatically, so that instead of a promise you just see the eventual result. This isn’t exactly how it’s implemented, but conceptually it’s quit similar. And rather than managing concurrency using event-based callbacks, StratifiedJS introduces explicit concurrency syntax which provides structured (lexical) concurrency, rather than a tangle of events and callbacks which are hard to understand and reason about.

Let’s transduce some things!

So, let’s try and do a bunch of semi-complex things with transducers in JavaScript, and see how they compare to StratifiedJS, a superset of JavaScript with rich concurrency support.

For the examples below, I’m using the following libraries:

Not being up to scratch with the latest npm hotness, I wasn’t really sure which libraries to go with - there are seemingly endless variations on the name “transducers”, and more promise implementations than you can poke a stick at. But these are what I used, particularly because transduce has builtin support for async transducers, which doesn’t seem to be the case with some other libraries.

If you want to run an example, you can throw it in a .js file after the following prelude:

#!/usr/bin/env node
var t = require('transduce');
var Promise = require('es6-promise').Promise;
var TransduceStream = require('transduce-stream');
var ObjectStream = require('object-stream');
var EventEmitter = require('events').EventEmitter;

If you want to run the StratifiedJS examples, you just need to add this to the top: (you’ll obviously need StratifiedJS installed, too)

#!/usr/bin/env sjs
@ = require('sjs:std');

Example 1: nested map / filter / groupBy

To pick a random sequence processing task, lets try to:

1) group sequential elements in a stream based on whether they’re even or odd 2) sum up each group 3) report only the sums which are divisible by 3

It’s a bit arbitrary, but it includes a bunch of transformations which need to be chained. Here’s how I’d do it in StratifiedJS:

var items = [1, 2, 4, 12, 8, 3, 13, 5, 6, 7];
var sum = items -> items .. @reduce(0, (a,b) -> a + b);
items = items .. @groupBy(x -> x%2)
  .. @transform(([key, items]) -> items) // ignore the `key`
  .. @transform(sum)
  .. @filter(x -> x%3 === 0)
  .. @toArray();
console.log(items);

// => [ 21, 6 ]

Aside: StratifiedJS primer

I don’t expect you to be familiar with StratifiedJS for these examples, but the intent is that (glossing over syntactic details) they should be fairly readable. StratifiedJS is a superset of JavaScript - most of the syntax is just JavaScript. But here’s a primer on the StratifiedJS-only features I’m be using:

  • @foo by convention denotes a standard library function called foo.
  • transform is a lazy version of map - i.e. it produces a lazy stream rather than an array.
  • someFunc() ... anotherfunc() is like a pipeline. You don’t really need to know the details, other than it’s just function application which reads from left-to-right. It’s a lot like how composition works in a unix pipeline (cat foo | grep bar | wc -l).
  • a -> b is a lambda function, equivalent to function(a) { return b; }

So hopefully the above example is pretty straightforward - we’ve chained / composed transform, groupBy, filter using simple function application. Since we’re using transform (a lazy version of map), the transformations will be applied only as necessary - we won’t build up an array for each intermediate step.

Here’s what the equivalent code looks like with transducers:

var items = [1, 2, 4, 12, 8, 3, 13, 5, 6, 7];
var sum = {
  "@@transducer/init": function() { 
    return 0;
  },
  "@@transducer/result": function(result) { 
    return result;
  },
  "@@transducer/step": function(result, input) {
    return result + input;
  }
};

var transducer = t.compose(
  t.partitionBy(function(x) { return x%2; }),
  t.map(function(items) { return t.reduce(sum, items); }),
  t.filter(function(x) { return x%3 === 0; })
);
items = t.into([], transducer, items);
console.log(items);

// => [ 21, 6 ]

Not too bad, transducers. The glaring weirdness is that I had to implement sum myself as a (very boring) raw transformation object. I really expected that I’d be able to build a transducer from the kind of function you’d pass to reduce(), but I couldn’t figure out how. I’ll update this if someone enlightens me. But in terms of actually composing the transducers, it reads just like my StratifiedJS pipeline. And unlike a regular JavaScript pipeline, it won’t construct intermediate arrays either.

Also note that sum is not a transducer - it’s a transformation. A transformation is like the “base” form of a transducer, while a transducer is actually a function which takes a transformation and returns a new transformation. This wasn’t explained terribly well in the documentation, and it feels a little odd since transducers are what all the fuss is about, while transformations are clearly important too.

Example 2: … but what about async?

It’s not a proper JavaScript example without an asynchronous spanner thrown in the works. The transduce npm module I chose has support for async transformations, so how do you use it?

var items = [1,2,3];
var plusOne = t.map(function(x) { return x + 1 });
var plusOneSlowly = t.map(function(item) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      resolve(item + 1);
    }, 100);
  });
});
var transducer = t.compose(plusOneSlowly, t.async.defer(), plusOne);
t.async.into([], transducer, items).then(function(items) {
  console.log(items);
});

// => [ 3, 4, 5 ]

OK, so that was actually pretty decent. We were able to combine a sync (plusOne) and an async (plusOneSlowly) transucer in the same pipeline. The changes were just:

  • use t.async.into (which returns a promise) instead of t.into
  • insert a t.defer() after each asynchronous transducer in the arguments to compose

The latter was a little confusing - originally I assumed I needed to wrap each async transducer, as in t.defer(addOneToEverything). But when I did that, my transformation was completely ignored. So that was pretty alarming. It’s my fault for not reading the docs clearly enough, but it seems a bit counterintuitive. Now that I know how transducers work it actually makes a little more sense, because defer is nothing special - it’s just a transducer which resolves promises into their values. But it threw me off for a while.

For comparison, here’s how it would look in StratifiedJS:

var items = [1,2,3];
console.log(items
  .. @transform(x -> x + 1)
  .. @transform(function(x) {
      hold(100); // builtin function which suspends for `n` milliseconds
      return x + 1;
    })
  .. @toArray());

// => [ 3, 4, 5 ]

StratifiedJS has no real need for promises - any expression can “suspend” and you get the benefits of sequential semantics combined with the performance of async code. So this was always going to knock JavaScript out of the park, which goes to show that sometimes a critical feature in one language might not be all that useful elsewhere.

Example 3: lazily reading lines from a file

This one is probably the most complex, but it’s a practical, real-world application.

One thing I’ve noticed with nodejs streams is that doing anything truly custom is a massive pain - you basically have to implement a DuplexStream and deal with all the intricacies of the Stream interface yourself. As an example, here’s the 100+ lines implementation of byline, which implements the process I’m about to describe. So it’s a very low bar that transducers will need to beat here ;)

The main complexity here is that we read streams in “chunks”, and emit “lines” - each chunk may contain any number of lines, and we need state to track the part of the most recent line we’ve seen. So each time we see a chunk from the source, we’ll have zero or more lines to emit.

Here’s how I do it in StratifiedJS:

// Fake `chunks` stream - we use the same `Stream` interface for
// real nodeJS streams, so this isn't cheating.
var fileChunks = @Stream(function(emit) {
  var chunks = [
    'I am the first',
    ' line\n..and I am the second\n ...',
    ' third!'
  ];
  chunks .. @each {|chunk|
    hold(100);
    emit(chunk);
  };
});

// This accepts a "chunk" stream and returns a "line" stream.
// Both streams are lazy - the next chunk won't be read until it's required
var linesOfStream = function(stream) {
  return @Stream(function(emit) {
    // buffer any partial contents of the current line
    var current = '';
    // iterate over chunks
    stream .. @each {|chunk|
      current += chunk;
      var lines = current.split('\n');
      current = lines.pop();
      // when we have any full lines, emit them
      lines .. @each(emit);
    }
    // emit the last line, if any
    if(current != '') emit(current);
  });
};

// Collect the (lazy) stream into an array and just log it
console.log(fileChunks
  .. linesOfStream
  .. @transform(x -> x + "!")
  .. @toArray());

// => [ 'I am the first line!', '..and I am the second!', ' ... third!!' ]

..and here’s how you can do it with a JavaScript transducer:

var chunks = [
  'I am the first',
  ' line\n..and I am the second\n ...',
  ' third!'
];

// make a nodejs stream with artificial delay between each chunk)
var fileChunks = ObjectStream.fromArray(chunks).pipe(
  ObjectStream.map(function(item, done) {
    setTimeout(function() { done(null, item); }, 10);
  })
);

// The `Lines` transducer accepts chunks from the source, and
// emits lines into the downstream transducer:
var Lines = function(xf) {
  var current = '';
  return {
    "@@transducer/init": function() { 
      return xf["@@transducer/init"](); 
    },
    "@@transducer/result": function(result) { 
      result = xf["@@transducer/step"](result, current); 
      if (t.isReduced(result)) {
        return t.unreduced(result);
      }
      return xf["@@transducer/result"](result); 
    },
    "@@transducer/step": function(result, input) {
      input = input.toString('utf-8');
      current += input;
      var lines = current.split('\n');
      current = lines.pop();
      lines.forEach(function(line) {
        result = xf["@@transducer/step"](result, line); 
        if (t.isReduced(result)) {
          return t.unreduced(result);
        }
      });
      return result;
    }
  };
};

var transducer = t.compose(Lines, t.map(function(x) { return x + '!'; }));
var lineStream = fileChunks.pipe(TransduceStream(transducer));

lineStream.on('data', function(d) {
  console.log('Line:', d.toString('utf-8'));
})
lineStream.on('end', function() {
  console.log('Done');
});

// =>
// Line: I am the first line!
// Line: ..and I am the second!
// Line:  ... third!!
// Done

So that’s quite a bit longer, but it’s actually not too bad, considering the alternative (implementing this directly as a NodeJS stream transformation). Plus, you could use the same transducer on arrays / event emitters which for some reason spit out chunks instead of lines.

One thing that complicates this is that we have to take care to handle “early return” values, which are named “reduced” values. I’m not sold on the name (perhaps “terminator” would be better), but it forces us to check for them in various places since our transducer doesn’t just have a one-to-one (or one-to-zero) mapping between input and output items.

Also, I couldn’t find a good way to tell the stream machinery to leave my strings alone - I kept receiving them (in both the step function and the output data items) as Buffer objects. Maybe I missed it in the API, or maybe it just needs to be added. It’s not a big deal, but it feels hacky and unnecessary.

I originally thought this was one transformation that couldn’t be expressed as a simple reduce in StratifiedJS, because you need to keep track of two accumulators - the lines so far, as well as the current (partial) line buffer, But I realised you actually can, if you simply treat the “last line” as the buffer. This does mean you can’t omit the blank line at the end if your file ends with a newline. It also feels a little hacky to mutate the result during each step, but it is pleasantly concise:

var lines = fileChunks .. @reduce([''], function(lines, chunk) {
  var current = lines.pop();
  current += chunk;
  current.split('\n') .. @each(lines.push);
  return current;
});

Example 4: event streams

The last thing I thought I’d try was an event stream, because those can also be useful to process as a sequence (but unlike streams, you can’t tell your emitter to slow down - you just have to deal with events as they happen).

var transducer = t.compose(
  t.map(function(x) { return x+1; }),
  t.async.delay(1000),
  t.async.defer(),
  t.take(5)
);
var source = new EventEmitter();
var receiver = new EventEmitter();

var result = t.async.emitInto(receiver, transducer, source);

// kick off an infinite set of `source` events
var timeout;
var spawn = function() {
  var count = 0;
  return function() {
    timeout = setTimeout(function() {
      timeout = null;
      source.emit('data', count++);
      spawn();
    }, 500);
  };
}();
spawn();

receiver.on('data', function(data) {
  console.log("Data:", data);
});

receiver.on('end', function() {
  console.log("Done");
  if(timeout != null) clearTimeout(timeout);
});

// =>
// Data: 1
// Data: 2
// Data: 3
// Data: 4
// Data: 5
// ( ... program never terminates)

This one is a complete failure.

Note that I explicitly added code to deal with shutting down the emitter, which the documentation implied I would see after the take(5) had caused the transducer to finish prematurely. But it never stopped, nor was the underlying listener removed from source - if I insert a console.log you can see that the map step still gets called forever with new values, but its results just get ignored. And given that I never actually see an end event, it’s actually impossible to see when my transducer has “finished”. If anyone knows how to do this right, I’d be interested to know it.

Extra credit: streams & async processing

Even though streams are inherently asynchronous, I could find no way to properly apply an asynchronous transducer to my Lines stream. I tried, with:

var transducer = t.compose(Lines, t.async.delay(1000), t.async.defer());

… but as soon as the lines were all done being emitted, the output stream saw end() - any promises which had not resolved were simply dropped. That’s very bad, especially since whether or not it “works” is based on the speed of your input stream, so it could appear to work 95% of the time but silently do the wrong thing when your system is under load.

I don’t count this (or the previous example) as an inherent ding against transducers, though - it seems like this is probably just a bug in the transduce-stream library.

Closing thoughts

If you haven’t read the tl;dr above, that summarises my thoughts pretty well. But it’s worth reiterating that using transducers in JavaScript is failure-prone and hard to figure out1.

  • It’s really easy to abort your program with no indication of why - accidentally returning undefined from a function can short-circuit something, and instead of an error your program just halts prematurely. Not really a problem with transducers, but something to be aware of when using them in JavaScript. This is pretty much an occupational hazard with JavaScript though - a stray callback can silently ruin everything. I think I’ve been spoiled by using StratifiedJS for so long ;)

  • Also (probably the cause of some of the above), unhandled exceptions in my code would not get printed, the process would just exit silently. This is a terrible developer experience, but I don’t actually know which library is to blame. This is also just something that happens when you’re programming in JavaScript, sadly.

Appendix a: More examples

Feeling like I now grokked transducers, I figured I’d explore some more of their particular features. A lot of fuss is made of their ability to apply transformations to all sorts of collections, not just arrays. So let’s see how that looks, shall we?

“Mapping” the values of an object

var mapVal = function(fn) {
  return t.map(function(pair) {
    return [pair[0], fn(pair[1])];
  });
}

var src = {
  key1: 'val1',
  key2: 'val2',
  key3: 'val3',
};
var transducer = mapVal(function(x) {
  return x + '!';
});
var result = t.into({existing: 1}, transducer, src);
console.log(result);

// => { existing: 1, key1: 'val1!', key2: 'val2!', key3: 'val3!' }

I can’t really think of any situation where I’d need to transduce “into” a non-empty destination, but it is nevertheless quite neat.

Custom iterators

// custom "infinite sequence from <n>" implementation
function Count(initial) {
  var rv = {};
  rv[t.protocols.iterator] = function() {
    return {
      _idx: initial,
      next: function() {
        var self = this;
        return {
          value: new Promise(function(resolve) {
            setTimeout(function() {
              resolve(self._idx++);
            }, 10);
          }),
        };
      },
    }
  };
  return rv;
}
var transducer = t.compose(
  t.map(function(x) { return x+1; }),
  t.async.delay(10),
  t.async.defer(),
  t.take(5)
);

t.async.into([], transducer, Count(10)).then(function(result) {
  console.log(result);
});

// => [ 11, 12, 13, 14, 15 ]

This is pretty neat - we take an infinite sequence starting from 10, add one to each element, and then just take the first five numbers of the result. Of course, this is also pretty trivial with StratifiedJS:

function Count(n) {
  return @Stream(function(emit) {
    while(true) {
      hold(10);
      emit(n++);
    }
  });
}
console.log(Count(10)
  .. @transform(x -> x + 1)
  .. @take(5)
  .. @toArray());

// => [ 11, 12, 13, 14, 15 ]
  1. Fun fact:
    null + 2 + 4 == 6, and
    0 + 2 + 4 == 6, and
    t.into(0, sum, [2, 4])) == 6, but
    t.into(null, sum, [2, 4])) == 4
    because apparently an initial value of null is a special signifier for “return the last value of the input sequence”, which didn’t seem to be documented anywhere. I’d never rely on sane behaviour from null + <number> in real code, but it had me scratching my head when I encountered it by accident.