There's been a lot of hype about Iteratees in the past, as well as several good articles. I've been playing around with them in my spare time, and they have some really neat properties. Before getting into those, let's cover the basics.
Iteratees are an immutable implementation of a Consumer/Producer pattern. A Consumer takes input and produces an output. A Consumer can have one of three states:
Let's look at a possible implementation.
sealed trait ConsumerState[I,A]
case class Processing[I,A](f: I => Consumer[I,A]) extends ConsumerState[I,A]
case class Done[I,A](result: A) extends ConsumerState[I,A]
case class Error[I,A](t: Throwable) extends ConsumerState[I,A]
The Processing state contains a function that takes input from a Producer, and returns the next consumer. Remember that these are immutable, so any state change must return a new object. The Done state contains the result of a given consumer, and the FatalError state holds a single java error representing what failed. The actual consumer class will look like the following:
trait Consumer[I,O] {
def fold[A](f: ConsumerState[I,O] => Context[A]): Context[A]
}
The fold
method is the only necessary method for a consumer. The fold method takes a function that operates on any consumer state and returns a value in the iteratee Context
. The Context
is the mechanism that is evaluating an iteratee. This is what lets asynchronous iteratees return values in a Future
and operate on different threads.
h
Note: Consumer
s are called Iteratee
s in Haskell/Scalaz iteratee libraries.
Next is the Producer interface. A Producer should generate input values to feed into a Consumer. The producer 'drives' the Consumer to some final state, which is returned. Let's look at the interface:
trait Producer[I] {
def into[O](input: Consumer[I,O): Consumer[I,O]
}
The into
method takes a consumer, drives whatever data is in the Producer through the consumer, and returns the resulting consumer state. Note that this resulting consumer could still accept more input.
Note: Producer
s are called Enumerator
s in Haskell/Scalaz iteratee libraries.
Let's define a Producer.once
method that can take a single value and feed it into an Consumer
.
object Producer {
def once[A](value: A): Producer[A] = new Producer[A] {
def into[O](input: Consumer[I,O]): Consumer[I,O] = input fold {
case Processing(f) => f(value)
case other => Consumer(other)
}
}
}
The new producer simply folds over the consumer's state. If it's a processing consumer, we feed the value and return the result. If the consumer is in any other state, we return that state in a consumer.
The code has one major problem. The fold method returns values inside the Context
of the iteratee. That is, we can't peek into an consumer, unless we know where/how it is running and are in that context. This way, if the consumer is running inside a Future
, all processing code will join with the consumer. Let's modify the code.
object Producer {
def once[A](value: A): Producer[A] = new Producer[A] {
def into[O](input: Consumer[I,O]): Consumer[I,O] = Consumer.flatten(input fold {
case Processing(f) => contexted(f(value))
case other => contexted(Consumer(other))
})
}
}
This code assumes two methods. The contexted
method takes any expression, and evaluates it in the iteratee context. The signature is def contexted[A](a: =>A): Context[A]
. The Consumer.flatten
method can take a Context[Consumer[I,O]]
and flatten it out into a regular Consumer[I,O]
. (Implementation is here).
The biggest reason to use Iteratees is their composability. That is, we should be able to chain Consumers, such that one will run and then another. We should also be able to "zip" Consumers, such that two can run against the same input. To make this happen, we need to adapt our notion of consumer state a bit. First, let's ensure that 'leftover' input from one Consumer can be fed to the next when chaining.
case class Done[I,A](result: A, remainingInput: I) extends ConsumerState[I,A]
Now, the completed state has an additional value of the remaining input that was not consumed by the first iteratee. We can use this to implement our sequencing operation andThen
.
trait Consumer[I,O] {
def andThen[O2](next: O => Consumer[I, O2]): Consumer[I,O2] = Consumer flatten this.fold {
case Done(a, remaining) => contexted(Producer.once(remaining) into next(a))
case Processing(k) => contexted(Consumer(Processing(in => k(in) andThen next)))
case Error(e) => contexted(Consumer(Error(e)))
}
}
The algorithm for chaining is simple: * If the first consumer is done, feed the remaining input into the next consumer and return the resulting state. * If the first consumer is still processing, feed data into it, and delay creating the second consumer. * If the first consumer is in the error state, return that error state.
The fold pattern-match above reflects exactly that.
For those, who may be curious, the above is actually the implementation for flatMap
on Consumer
and will let us use Consumer
s in for expressions. To experiment, let's implement a few Consumers, starting with peek
.
The peek
consumer should take in an input value, return that as its done state and also put that value back onto its remaining
value so that the next Consumer
in the sequence will receive it again. Hence, we just peek at the value without really consuming it.
def peek[I]: Consumer[I,I] = {
def handler(input:I) = Consumer(Done(input, input))
Consumer(Processing(handler))
}
The peek
consumer is pretty trivial. Let's implement a head
consumer that will actually consume the first element.
def head[I]: Consumer[I,I] = {
def handler(input: I) = Consumer(Done(input, ???))
Consumer(Processing(handler))
}
This looks just like peek
, except we need some way of not placing the previous input as the remaining input. To do this, we need a way of creating an "empty" input. While there are several ways to do this, the simplest is by adding an additonal level of abstraction. Let's change our definition of consumers as follows:
sealed trait StreamValue[+I]
case class Chunk(value: I) extends StreamValue[I]
case object EmptyChunk extends StreamValue[Nothing]
case object EOF extends StreamValue[Nothing]
case class Processing[I,O](f: StreamValue[I] => Consumer[I,O]) extends ConsumerState[I,O]
Now, there is a StreamValue
trait that could be one of there things: A valid input, an empty input or a marker that input is at its end. The Processing state is updated to now take in a StreamValue
and return the next consumer.
Let's reimplement peek
and head
now.
def peek[I]: Consumer[I,Option[I]] = {
def handler(input:I) = input match {
case Chunk(input) => Consumer(Done(Some(input), Chunk(input)))
case EOF => Consumer(Done(None, EmptyChunk))
case _ => peek
}
Consumer(Processing(handler))
}
def head[I]: Consumer[I,Option[I]] = {
def handler(input:I) = input match {
case Chunk(input) => Consumer(Done(Some(input), EmptyChunk))
case EOF => Consumer(Done(None, EmptyChunk))
case _ => head
}
Consumer(Processing(handler))
}
The peek
and head
consumers now need to return optional values, because they could receive an EOF
input before having a chance to see the value. The only significant difference between these two is that head
will return an EmptyChunk
for remaining input, whereas peek
will return the original input. Now that we have these consumers, we can stream them together.
def headTwice: Consumer[I, Boolean] =
for {
p <- peek
h <- head
} yield p == h
This will run the peek
Consumer and then feed remaining input to head
. The headTwice observer should always return true, since peek does not actually consume the value from the stream.
The ability to sequence Consumers using for-expressions is one of their strengths. Complex protocols can be as simple as a large for-expression.
Two Consumers can also be 'zipped' or run off the same input. This can be handy, e.g. when checksuming a file while parsing it. Let's define a ZippedConsumer:
class ZippedConsumer[I,A,B](first: Consumer[I,A], second: Consumer[I,B]) extends Consumer[I, (A,B)] {
// Helper method to push data through both iteratees.
private def drive(input: StreamValue[I]): Consumer[I, (A,B)] =
new ZippedConsumer(Producer.once(input) into first, Producer.once(input) into second)
def fold[R](f: ConsumerState[I,(A,B)] => Context[R]): Context[R] =
first.fold { state1 =>
second.fold { state2 =>
(state1, state2) match {
case (Consumer.Done(a, i), Consumer.Done(b, _)) => f(Consumer.Done(a->b, i))
// TODO - Both cases are errors....
case (Consumer.Error(e,k), other) => f(Consumer.Error(e, k))
case (other, Consumer.Error(e,k)) => f(Consumer.Error(e, k))
case _ => f(Consumer.Processing(drive))
}
}
}
}
The ZippedConsumer
implements two methods. The first, drive
is a private helper method. If both underlying consumers are in a processing state, this will feed incoming data into them and return a new ZippedConsumer with the resulting new consumers. This method is used later.
The second method, fold
is the core method of Consumers. This method in turn, folds over the underlying states of the two consumers and determines a new combined state. In the event that both are completed, then the ZippedConsumer enters its completed state, returning a tuple of both results. If either consumer is in an error state, the zipped consumer is in an error state. If either consumer is still processing, the zipped consumer uses the drive
method to continue processing.
The ZippedConsumer
can be exposed via a zip
method. Now, given a consumer protocol consisting of msgHdr
and bodyProcessor
and a consumer sha1sum
, we can construct a new Consumer which reads in a message and validates its sha1sum:
val protocol: Consumer[ByteBuffer, Message] =
for {
hdr <- msgHdr
(body, sha) <- bodyProcessor(hdr) zip sha1sum
if sha == hdr.sha
} yield Message(hdr,body)
Note: the above assumes we have a valid withFilter
or filter
method on Consumer
s. This method converts the Consumer state to an error if the condition does not hold.
There's a lot of flexibility with iteratees now that we can parallel or sequentially consume data. Since every consumer is immutable, the sky's the limit on how you compose them.
The Iteratee library is starting to look nice, but there's still another concern. Say, we have a Producer
of byte buffers from files. However, in my program, I'd like to write consumers against lines of input. If I have a conversion from streams of byte buffers to streams of character buffers, and another from character buffers to delineated "lines", then I should be able to chain these on a producer and use consumers of lines.
For example, here's what a line-count algorithm should look like against a file:
val bytes = read_channel_bytes(fileChannel, directBuffers=true)
val chars = bytes convert charset.decoder()
val lines = chars convert lineDecoder
val lineCount = lines into counter
lineCount fold {
case Done(count, _) => contexted(count)
case _ => sys.error("NO result!")
}
In this example, read_channel_bytes
constructs a consumer of byte buffers from the file. The convert
method takes a stream conversion and returns a new Producer with transformed stream. So, first bytes
are converted into chars
by a charset.decoder()
, then chars
are converted into lines
by the lines
decoder. Finally, lines
are fed into a counter
Consumer, yielding a line count.
Let's first look at the count consumer, since that's the simplest:
def counter[I]: Consumer[I, Long] = {
def step(count: Long): StreamValue[I] => Consumer[I,Long] = {
case e @ EOF => Consumer(Done(count, e))
case c @ Chunk(_) => Consumer(Procesing(step(count + 1)))
case EmptyChunk => Consumer(Processing(step(count)))
}
Consumer(Processing(step(0)))
}
The counter
consumer keeps track of the count in the step
function. This function returns the next processing step.
Now, let's look at the interface for a StreamConversion. Note: In Haskell/Scalaz these are called Enumeratee
s.
trait StreamConversion[I,I2] {
def apply[O](i: Consumer[I2,O]): Consumer[I, Consumer[I2, O]]
def convert[O](i: Consumer[I2, O]): Consumer[I, O] =
apply(i).join
}
A StreamConversion has two methods: apply
and convert
. The convert
method, seen above, can take a Consumer
of one type of input stream (I
) and change it into a consumer of another type (I2
). The apply
method is the only abstract method of a stream conversion. This method constructs a Consumer that has a result of an underlying consumer. This is essentially what a conversion is. We construct one consumer, which adapts an incoming stream and feeds into the underlying consumer, eventually returning the result. The join
method, found here is a convenience method in this situation. Most of the time, we don't really care how a stream is being converted, we just want to pretend there is a new consumer.
Finally, Producer
s can also have a convert
method that takes a StreamConversion
and modifes their output. Let's take a look.
trait Producer[A] { self =>
def into[O](c: Consumer[A, O]): Consumer[A, O]
def convert[A2](conversion: StreamConversion[A, A2]): Producer[A2] =
new Producer[A2] {
override def into[O](c: Consumer[A2,O]): Consumer[A2,O] =
Consumer flatten (self into (conversion apply c)).result
override def toString = "ConvertedProducer("+self+" by " + conversion +")"
}
}
This convert
method simply takes any Consumer passed in and first wraps it by the stream conversion before pushing data into it. Since everything in this framework is immutable, it's ok to re-use the same stream conversions (decoders) over and over and over again.
For some example Stream Conversions, see the charset decoder, line splitter, word splitter
So far, we've seen that iteratees are great for streaming data. However, what if the stream support random access? We need a way to communicate between Producer
s and Consumer
s. So far, a Producer
can find out a consumer's state simply by fold
ing on it, and a Consumer receives information from the producer via the StreamValue
hierarchy. If we want the Consumer
to be able to tell the Producer
to move his input stream to a new location, we can accomplish this through some kind of channel in the ConsumerState
.
Since a Consumer
requesting some random-access seek, or other operation, be performed is probably in the midst of processing data, it makes sense to place a new message channel on the Processing
state class. Let's take a look:
trait ProcessingMessage
case class Processing[I,O](next: StreamValue[I] => Consumer[I,O],
optMsg: Option[ProcessingMessage] = None) extends ConsumerState[I,O]
Now, a Consumer
can include an optional processing message for Producer
s. All of the ZippedConsumer
, andThen
and other combinator logic has to be changed, such that the processing messages can be propogated. However, when complete, we can create a new Consumer called "seekTo".
def seekTo(offset: Long): Consumer[ByteBuffer, Unit] =
Consumer(Processing(in => Consumer.done((), in), Some(RandomAccessMsg.SeekTo(offset))))
This consumer will accept one input, and immediately be done, placing that input back into the stream. It also carries the RandomAccess.SeekTo
message (not shown before). This message may or may not be handled by our Producer. However, if the Producer can support the SeekTo message, it will move the stream and then continue sending input. You can see an example implementation for file channels here.
This allows us to use for-expressions to send messages to the producer. e.g., we could write the following:
def findData(key: Long): Consumer[ByteBuffer, Data] =
for {
index <- readIndex
hdr = index get key
_ <- seekTo(hdr.diskLocation)
data <- grabBlob(hdr.size)
} yield data
This consumer will first consume the index at the head of a file, and then look for the location on disk to consume the remaining data. This is a very elegant way to nest seek commands with other 'normal' consumers.
Hopefully, we've shown some insight in to the design of a good iteratee library here. I'd like to finish up with an example program from my current toy branch of Scalaz where I play with iteratees.
Here's a program with will perform the same calculations as the linux command line wc
. That is, it will count words, characters and lines in a file. Note that if the file does not line up with the default encoding for a platform, this will barf error messages.
import scalaz.nio.std._
import scalaz.effect.IO
import scalaz.syntax.monad._
import scalaz.nio.buffers._
/** Ensures a file is closed. */
def withFile[A](file: java.io.File)(proc: java.nio.channels.FileChannel => IO[A]): IO[A] =
for {
stream <- IO(new java.io.FileInputStream(file))
result <- proc(stream.getChannel) onException(IO(stream.close()))
_ <- IO(stream.close())
} yield result
// Mimics WC command-line
def wcLzy(file: java.io.File): IO[String] =
withFile(file) { c =>
val bytes: Producer[ImmutableBuffer[Byte,Read]] =
bytechannels.read_channel_bytes(c, directBuffers=true)
val chars = Producer[ImmutableBuffer[Char,Read]] =
bytes convert charsets.decoder()
val wordCount: Consumer[ImmutableBuffer[Char, Read], Long] =
charchannels.words convert utils.counter
val lineCount: Consumer[ImmutableBuffer[Char, Read], Long] =
charchannels.lines convert utils.counter
val allCount: Consumer[ImmutableBuffer[Char,Read], String] =
lineCount zip wordCount zip utils.lengthCounter(_.remaining) map {
case((lc, wc), cc) => "lines: %d, words %d, chars %s" format (lc,wc,cc)
}
chars into allCount result
}
def wc(file: java.io.File): String =
wcLzy(file).unsafePerformIO
In this example, the Consumers are being threaded through Scalaz's IO monad. This is not a necessity of the library and in fact, my goal for the asynchronous bits is to use futures rather than IO to better represent the execution model.
I hope that this has simplified your view of Iteratees. They really are an amazingly flexible, elegant system of processing inputs and producing outputs. I'm only at the beginnings of a decent Iteratee library, and already I can see the huge power in the ability to avoid re-streaming data through combining consumers.
It's my personal opinion that Iteratees demonstrate a lot of good design principles of functional programming.
One aspect that I didn't get to delve into much is "Leveraging category theory for minimal implemenations with rich APIs". Perhaps in a blog post covering Scalaz7.