Wednesday, January 02, 2008

Distributed F# Sample

The week between Christmas and New Year's Day is typically a slow one; I took advantage of some quiet time in the office to play with F#. What with F# becoming a "first class" .NET language and shipping with VS 2008, I wanted to see what it would take to adapt one of our standard samples (using a Monte Carlo calculation to compute Pi in parallel across different cores and machines on the grid).

I was a bit rusty in functional programming (I took Lisp in about 1990), and the eventing took a bit of research to understand, but I got something working. (See code below) I'm not sure if I've made any grievous errors -- if you see any, please let me know!

I know Don Syme recently posted about using Parallel Extensions in F#; next I'll have to take a look to see if I can extend that from using many cores to using many cores on many boxes.

Anyway, here's the F# code I ginned up:
// F# Digipede Sample

open System

let mutable AccumulatedPi = 0.0

let numTasks = 100
let numIterationsPerTask = 1000000

// This function determines if a point lies within the Unit Circle
let InUnitCircle x y = sqrt(x * x + y * y)
// This is the class that will be distributed for remote computation; the DoWork
// method is overridden, and will be invoked on the objects when
// they are on the remote nodes
type MyWorker =
    inherit Digipede.Framework.Api.Worker
    val mutable _Pi : double
    val _NumIterations : int new(a) = { _NumIterations=a; _Pi=0.0 }

    override x.DoWork() =
      let r = new Random(x.Task.TaskId)
      let mutable XCoord = 0.0
      let mutable YCoord = 0.0
      let mutable NumberInUnitCircle = 0

      for i = 0 to x._NumIterations do
        XCoord <- r.NextDouble()
        YCoord <- r.NextDouble()
        NumberInUnitCircle <- NumberInUnitCircle + if InUnitCircle XCoord YCoord then 1 else 0

      x._Pi <- 4.0 * double NumberInUnitCircle / (double x._NumIterations)

// This function will be called (in the "master application") for each task that
// completes (see reference below)
let OnTaskCompleted (args) =
  let myArgs = (args :> Digipede.Framework.Api.TaskStatusEventArgs )
  let returnedWorker = (myArgs.Worker :?> MyWorker)
  printf " Task %d calculated %f \n" myArgs.TaskId returnedWorker._Pi
  AccumulatedPi <- AccumulatedPi + returnedWorker._Pi
  printf " Current total is %f\n" AccumulatedPi

// Ok, this is the code that is the "master" application. It will define and submit the job

// First, we instantiate a Digipede Client and tell it where the Digipede Server is
let Client = new Digipede.Framework.Api.DigipedeClient()

// Now, tell the system what class we're distributing. It will automatically
// determine which binaries need to be distributed to use this class
let JT = Digipede.Framework.JobTemplate.NewWorkerJobTemplate(type MyWorker ) ;
// this line tells the Digipede Agent to run these tasks "per core" on multi-core machines
JT.Control.Concurrency <- Digipede.Framework.ApplicationConcurrency.MultiplePerCore // Create a job, and add tasks to it. Each task gets a new MyWorker object
let aJob = new Digipede.Framework.Job()
aJob.Name <- "F# Job"
for i = 1 to numTasks do
let myTask = new Digipede.Framework.Task()

let mw = new MyWorker(numIterationsPerTask)
myTask.Worker <- (mw :> Digipede.Framework.Api.Worker)

// Next, I set up some events.
(* This is like an anonymous delegate. It will get executed each time we get notified that a task has completed. *)

do aJob.TaskCompleted.Add
(fun args -> ( printf "Task %d executed on %s \n" args.TaskId args.TaskStatusSummary.ComputeResourceName ) )

do aJob.TaskFailed.Add (fun args -> ( printf "Task %d failed \n" args.TaskId) )

// Another way to handle events is to give it a function to call on task completion
// I'm doing both, which is redundant
do aJob.TaskCompleted.Add(fun args -> OnTaskCompleted args)

let sr = Client.SubmitJob(JT, aJob)

printf "Submitted job %s\n" (sr.JobId .ToString() )
printf "Waiting for results\n"

let mybool = Client.WaitForJob(sr.JobId)
printf "Job finished\n"

AccumulatedPi <- AccumulatedPi / (double numTasks) printf "Calculated pi is %f\n" AccumulatedPi Threading.Thread.Sleep(5000)

As I said before: this is my very first dabble into F#, and I may be making mistakes. But it runs, and I've figured out a couple of ways to handle events.

Technorati tags: , ,