Functional Programming

Patterns in Scala

Who am I?

Agenda

Definition

Functional Programming is the composition of behavior using pure functions.

Pure Functions

Functions that are referentially transparent.

Which one is RT?

def plus(x: Int, y: Int) = 
  x + y

vs.

def log(msg: String): Unit = 
  logfile println wrap(msg)

Why stay pure?

A reason to care

Imagine you’re gluing together a set of services.

Each call to service is an asynchronous network operation.

Some operations can be run in parallel.

We have a high load on the server and want to prevent blocking.

Basic Outline

Collecting statistics for a given github user

A few building blocks

trait Future[A] {
  def isDone: Boolean
  def await: A
}
trait GithubService {
  def projects(user: User): Future[Seq[Project]]
  def pullreqs(project: Project): Future[Seq[PullRequest]]
  def watchers(project: Project): Future[Seq[User]]
}

A first start

def getStatistics(user: User): Statistics = {
  val projects = api.projects(user).await 
  var reqs = Seq.empty[PullRequest]
  var watchers = Seq.empty[User]
  for(project <- projects) {
    reqs ++= api.pullreqs(project).await
    watchers ++= api.watchers(project).await
  }
  Statistics(user, reqs, watchers)
}

A first start - Threading

Adding some Asynch

trait Future[A] {
  def map(f: A => B): Future[B]
}

More Asynch?

def getStatistics(user: User): Future[Statistics] =
  api.projects(user) map { projects =>
    var reqs = Seq.empty[PullRequest]
    var watchers = Seq.empty[User]
    for(project <- projects) {
      reqs ++= api.pullreqs(project).await
      watchers ++= api.watchers(project).await
    }
    Statistics(user, reqs, watchers)
  }

Map + Threads

Nesting!

 val projects: Future[Seq[Project]] =
   api.projects(user)

 val firstpullreqs: Future[Future[Seq[PullRequests]]] =
   projects map { projects =>
     api.getpullreqs(projects.head)
   }

val flattenedPullReqs: Future[Seq[PullRequests]] =
  ???

Chaining Asynchronous Calls

object Future {
  def flatten[A](f: Future[Future[A]]): Future[A]
}
trait Future[A] {
  def flatMap(f: A => Future[B]): Future[B]
}

A fix for nesting!

val projects: Future[Seq[Project]] =
  api.projects(user)

val firstpullreqs: Future[Seq[PullRequests]] =
  projects flatMap { projects =>
    api.getpullreqs(projects.head)
  }

Flatmap threading

Too much blocking

Traversing

object Future {
  def traverse[A,B](values: Seq[A])(asynchOp: A => Future[B]): Future[Seq[B]]
}

Traversal and Threads

Traversing

def getStatistics(user: User): Future[Statistics] = {
  val projects: Future[Seq[Project]] = 
    api.projects(user)
  val requests: Future[Seq[Seq[PullRequests]]] = 
    projects flatMap { projects => Future.traverse(projects)(api.pullrequests) }
  val watchers: Future[Seq[Seq[User]]] = 
    projects flatMap { projects => Future.traverse(projects)(api.watchers) }    
  Statistics(user, reqs.await.flatten, watchers.await.flatten)
}

Zipping/Joining

trait Future[A] {
  def zip[B](other: Future[B]): Future[(A,B)]
}

Using zip

def getStatistics(user: User): Future[Statistics] = {
  val projects: Future[Seq[Project]] = api.projects(user)
  val requests: Future[Seq[PullRequest]] = 
     projects flatMap { projects => 
        Future.traverse(projects)(api.pullrequests) 
     } map (_.flatten)
  val watchers: Future[Seq[User]] = 
     projects flatMap { projects => 
        Future.traverse(projects)(api.watchers) 
     } map (_.flatten)
  val together: Future[(Seq[PullRequest], Seq[User])] =
     reqs zip watchers    
  together map { case (r, w) => Statistics(user, r, w) }
}

Some sugar

def getStatistics(user: User): Future[Statistics] = {
  val projects: Future[Seq[Project]] = api.projects(user)
  val requests: Future[Seq[PullRequest]] = for {
       ps <- projects
       requests <- Future.traverse(projects)(api.pullrequests)
    } yield requests.flatten
  val watchers: Future[Seq[User]] = ...
  for((r,w) <- requests zip watchers)
  yield Statistics(user, r, w)
}

What about testing?

Want to test program logic not execution context.

Abstracting the Concepts

Functors

The map operation

trait Functor[Context[_]] {
  def map[A,B](
     value: Context[A],
     raw_function: A => B
  ): Context[B]
}

Monads

The flatMap operation

trait Monad[Context[_]] {
  def point[A](raw_value: A): Context[A]

  def flatMap[A,B](
     value: Context[A],
     operation: A => Context[B]
  ): Context[B]
}

Applicative Functors

The zip operation

trait Applicative[Context[_]] {
  def point[A](raw_value: A): Context[A]
  
  def applyNested[A,B](
    value: Context[A],
    contexted_function: Context[A => B]
  ): Context[B]
}

Iteration

The traverse operation.

trait Traverse[Collection[_]] {
  def traverse[Context[_]: Applicative,A,B](
     collection: Collection[A],
     operation: A => Context[B]
  ): Context[Collection[B]]
}

A mite o’ refactoring

Generalize the github API trait

trait GhApi[Context[_]] {
  def projects(user: String): Context[Seq[Project]]
  def pullreqs(proj: Project): Context[Seq[PullRequest]]
  def watchers(proj: Project): Context[Seq[Watcher]]
}

class AsyncGhApi extends GhApi[Future] { ... }

class TestingGhApi extends GhApi[SingleThreaded] { ... }

Generic Service

class GenericStatisticsService[Context[_]: Monad](api: GhApi[Context]) {  
  def statistics(user: String): Context[Statistics] = {
      val projects = api.projects(user)
      def get[B](f: Project => Context[Seq[B]]): Context[Seq[B]] = for {
        ps <- projects
        items <- ps traverse f
      } yield items.flatten
      val pullRequests = get(api.pullrequests)
      val watchers = get(api.watchers)
      (pullRequests zip watchers) map { case (prs, ws) => Statistics(user, ws, prs)}
  }
}

What is a SingleThreaded context?

object Contexts {
  type SingleThreaded[A] = A
  type Future[A] = akka.dispatch.Future[A]
}

SingleThreaded Functor

map method

def map[A,B](a: SingleThreaded[A])(f: A => B): SingleThreaded[B] =

Since SingleThreaded[A] is A, map is f(a)

SingleThreaded Monad

flatMap method

def flatMap[A,B](a: SingleThreaded[A],
                f: A => SingleThreaded[B]): SingleThreaded[B] =

SingleThreaded Applicative Functor

ap method

def ap[A,B](
  a: SingleThreaded[A],
  f: SingleThreaded[A => B]): SingleThreaded[B] =

Now a single-threaded test

val api: GhApi[SingleThreaded] = new StubbedApi
val service = new GenericStatisticsService(api)
val expected = Statistics("Josh",Seq(Watcher()), Seq(PullRequest()))
val results = service.statistics("Josh")
results must equalTo(expected)

Conclusions

Questions?