Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions src/lime/app/Future.hx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import lime.utils.Log;
var promise = new Promise<T>();
promise.future = this;

FutureWork.run(work, promise);
FutureWork.runSimpleJob(work, promise);
}
else
#end
Expand Down Expand Up @@ -308,7 +308,6 @@ import lime.utils.Log;
}
}

#if (lime_threads && !html5)
/**
The class that handles asynchronous `work` functions passed to `new Future()`.
**/
Expand All @@ -319,26 +318,42 @@ import lime.utils.Log;
@:dox(hide) class FutureWork
{
private static var threadPool:ThreadPool;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic}>;
private static var promises:Map<Int, {complete:Dynamic->Dynamic, error:Dynamic->Dynamic, progress:Int->Int->Dynamic}>;

public static var minThreads(default, set):Int = 0;
public static var maxThreads(default, set):Int = 1;
public static var activeJobs(get, never):Int;

@:allow(lime.app.Promise)
private static inline function cancelJob(id:Int):Void
{
threadPool.cancelJob(id);
}

#if (lime_threads && !html5)
@:allow(lime.app.Future)
private static function run<T>(work:Void->T, promise:Promise<T>):Void
private static function runSimpleJob<T>(work:Void->T, promise:Promise<T>):Void
{
run(threadPool_doWork, promise, work, MULTI_THREADED);
}
#end

@:allow(lime.app.Promise)
private static function run<T>(work:WorkFunction<State->WorkOutput->Void>, promise:Promise<T>, state:State, mode:ThreadMode):Int
{
if (threadPool == null)
{
threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED);
threadPool.onComplete.add(threadPool_onComplete);
threadPool.onError.add(threadPool_onError);
threadPool.onProgress.add(threadPool_onProgress);

promises = new Map();
}

var jobID:Int = threadPool.run(threadPool_doWork, work);
promises[jobID] = {complete: promise.complete, error: promise.error};
var jobID:Int = threadPool.run(work, state, mode);
promises[jobID] = {complete: promise.complete, error: promise.error, progress: promise.progress};
return jobID;
}

// Event Handlers
Expand Down Expand Up @@ -368,6 +383,15 @@ import lime.utils.Log;
promise.error(error);
}

private static function threadPool_onProgress(progress:{progress:Int, total:Int}):Void
{
// ThreadPool doesn't enforce types, so check manually
if (Type.typeof(progress) == TObject && Type.typeof(progress.progress) == TInt && Type.typeof(progress.total) == TInt)
{
promises[threadPool.activeJob.id].progress(progress.progress, progress.total);
}
}

// Getters & Setters
@:noCompletion private static inline function set_minThreads(value:Int):Int
{
Expand All @@ -392,4 +416,3 @@ import lime.utils.Log;
return threadPool != null ? threadPool.activeJobs : 0;
}
}
#end
96 changes: 86 additions & 10 deletions src/lime/app/Promise.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package lime.app;

import lime.app.Future;
import lime.system.ThreadPool;
import lime.system.WorkOutput;

/**
`Promise` is an implementation of Futures and Promises, with the exception that
in addition to "success" and "failure" states (represented as "complete" and "error"),
Expand All @@ -10,18 +14,20 @@ package lime.app;
for recipients of it's `Future` object. For example:

```haxe
function examplePromise ():Future<String> {

var promise = new Promise<String> ();
function examplePromise():Future<String>
{
var promise = new Promise<String>();

var progress = 0, total = 10;
var timer = new Timer (100);
timer.run = function () {
var timer = new Timer(100);
timer.run = function()
{

promise.progress (progress, total);
progress++;

if (progress == total) {
if (progress == total)
{

promise.complete ("Done!");
timer.stop ();
Expand All @@ -31,12 +37,11 @@ package lime.app;
};

return promise.future;

}

var future = examplePromise ();
future.onComplete (function (message) { trace (message); });
future.onProgress (function (loaded, total) { trace ("Progress: " + loaded + ", " + total); });
var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```
**/
#if !lime_debug
Expand Down Expand Up @@ -69,6 +74,8 @@ class Promise<T>
**/
public var isError(get, never):Bool;

private var jobID:Int = -1;

#if commonjs
private static function __init__()
{
Expand Down Expand Up @@ -96,11 +103,23 @@ class Promise<T>
**/
public function complete(data:T):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(complete.bind(data));
return this;
}

if (!future.isError)
{
future.isComplete = true;
future.value = data;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__completeListeners != null)
{
for (listener in future.__completeListeners)
Expand All @@ -115,6 +134,45 @@ class Promise<T>
return this;
}

/**
Runs the given function asynchronously, and resolves this `Promise` with
the complete, error, and/or progress events sent by that function.
Sample usage:

```haxe
function examplePromise():Future<String>
{
var promise = new Promise<String>();
promise.completeAsync(function(state:State, output:WorkOutput):Void
{
output.sendProgress({progress:state.progress, total:10});
state.progress++;

if (state.progress == 10)
{
output.sendComplete("Done!");
}
},
{progress: 0}, MULTI_THREADED);

return promise.future;
}

var future = examplePromise();
future.onComplete(function(message) { trace(message); });
future.onProgress(function(loaded, total) { trace("Progress: " + loaded + ", " + total); });
```

@param doWork A function to perform work asynchronously. For best results,
see the guidelines in the `ThreadPool` class overview.
@param state The value to pass to `doWork`.
@param mode Which mode to run the job in: `SINGLE_THREADED` or `MULTI_THREADED`.
**/
public function completeAsync(doWork:WorkFunction<State->WorkOutput->Void>, ?state:State, ?mode:ThreadMode = MULTI_THREADED):Void
{
jobID = FutureWork.run(doWork, this, state, mode);
}

/**
Resolves this `Promise` with the complete, error and/or progress state
of another `Future`
Expand All @@ -137,11 +195,23 @@ class Promise<T>
**/
public function error(msg:Dynamic):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(error.bind(msg));
return this;
}

if (!future.isComplete)
{
future.isError = true;
future.error = msg;

if (jobID != -1)
{
FutureWork.cancelJob(jobID);
jobID = -1;
}

if (future.__errorListeners != null)
{
for (listener in future.__errorListeners)
Expand All @@ -164,6 +234,12 @@ class Promise<T>
**/
public function progress(progress:Int, total:Int):Promise<T>
{
if (!ThreadPool.isMainThread())
{
haxe.MainLoop.runInMainThread(this.progress.bind(progress, total));
return this;
}

if (!future.isError && !future.isComplete)
{
if (future.__progressListeners != null)
Expand Down
4 changes: 4 additions & 0 deletions src/lime/system/WorkOutput.hx
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ class JobData
private inline function new(doWork:WorkFunction<State->WorkOutput->Void>, state:State, ?id:Int)
{
this.id = id != null ? id : nextID++;
if (this.id == -1)
{
throw "All job IDs have been used!";
}
this.doWork = doWork;
this.state = state;
}
Expand Down
Loading