Distributed F# Sample
The week between Christmas and New Year's Day is typically a slow one; I took advantage of some quiet time in the office to play with F#. What with F# becoming a "first class" .NET language and shipping with VS 2008, I wanted to see what it would take to adapt one of our standard samples (using a Monte Carlo calculation to compute Pi in parallel across different cores and machines on the grid).
I was a bit rusty in functional programming (I took Lisp in about 1990), and the eventing took a bit of research to understand, but I got something working. (See code below) I'm not sure if I've made any grievous errors -- if you see any, please let me know!
I know Don Syme recently posted about using Parallel Extensions in F#; next I'll have to take a look to see if I can extend that from using many cores to using many cores on many boxes.
Anyway, here's the F# code I ginned up:// F# Digipede Sample
#light
open System
let mutable AccumulatedPi = 0.0
let numTasks = 100
let numIterationsPerTask = 1000000
// This function determines if a point lies within the Unit Circle
let InUnitCircle x y = sqrt(x * x + y * y)
// This is the class that will be distributed for remote computation; the DoWork
// method is overridden, and will be invoked on the objects when
// they are on the remote nodes
type MyWorker =
class
inherit Digipede.Framework.Api.Worker
val mutable _Pi : double
val _NumIterations : int new(a) = { _NumIterations=a; _Pi=0.0 }
override x.DoWork() =
let r = new Random(x.Task.TaskId)
let mutable XCoord = 0.0
let mutable YCoord = 0.0
let mutable NumberInUnitCircle = 0
for i = 0 to x._NumIterations do
XCoord <- r.NextDouble()
YCoord <- r.NextDouble()
NumberInUnitCircle <- NumberInUnitCircle + if InUnitCircle XCoord YCoord then 1 else 0
x._Pi <- 4.0 * double NumberInUnitCircle / (double x._NumIterations)
end
// This function will be called (in the "master application") for each task that
// completes (see reference below)
let OnTaskCompleted (args) =
let myArgs = (args :> Digipede.Framework.Api.TaskStatusEventArgs )
let returnedWorker = (myArgs.Worker :?> MyWorker)
printf " Task %d calculated %f \n" myArgs.TaskId returnedWorker._Pi
AccumulatedPi <- AccumulatedPi + returnedWorker._Pi
printf " Current total is %f\n" AccumulatedPi
// Ok, this is the code that is the "master" application. It will define and submit the job
// First, we instantiate a Digipede Client and tell it where the Digipede Server is
let Client = new Digipede.Framework.Api.DigipedeClient()
Client.SetUrlFromHost("myservername")
// Now, tell the system what class we're distributing. It will automatically
// determine which binaries need to be distributed to use this class
let JT = Digipede.Framework.JobTemplate.NewWorkerJobTemplate(type MyWorker ) ;
// this line tells the Digipede Agent to run these tasks "per core" on multi-core machines
JT.Control.Concurrency <- Digipede.Framework.ApplicationConcurrency.MultiplePerCore // Create a job, and add tasks to it. Each task gets a new MyWorker object
let aJob = new Digipede.Framework.Job()
aJob.Name <- "F# Job" for i = 1 to numTasks do
let myTask = new Digipede.Framework.Task()
let mw = new MyWorker(numIterationsPerTask)
myTask.Worker <- (mw :> Digipede.Framework.Api.Worker)
aJob.Tasks.Add(myTask)// Next, I set up some events.
(* This is like an anonymous delegate. It will get executed each time we get notified that a task has completed. *)
do aJob.TaskCompleted.Add
(fun args -> ( printf "Task %d executed on %s \n" args.TaskId args.TaskStatusSummary.ComputeResourceName ) )do aJob.TaskFailed.Add (fun args -> ( printf "Task %d failed \n" args.TaskId) )
// Another way to handle events is to give it a function to call on task completion
// I'm doing both, which is redundant
do aJob.TaskCompleted.Add(fun args -> OnTaskCompleted args)
let sr = Client.SubmitJob(JT, aJob)
printf "Submitted job %s\n" (sr.JobId .ToString() )
printf "Waiting for results\n"
let mybool = Client.WaitForJob(sr.JobId)
printf "Job finished\n"
AccumulatedPi <- AccumulatedPi / (double numTasks) printf "Calculated pi is %f\n" AccumulatedPi Threading.Thread.Sleep(5000)
As I said before: this is my very first dabble into F#, and I may be making mistakes. But it runs, and I've figured out a couple of ways to handle events.
|