300 lines
7.6 KiB
JavaScript
300 lines
7.6 KiB
JavaScript
var Channel = require("sync-channel");
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Channel
|
|
|
|
Channel.prototype.readEvent = function () {
|
|
var channel = this;
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
var baseEvent = new ReadEvent(channel, wrapFunction);
|
|
baseEvents.push(baseEvent);
|
|
cont(null);
|
|
});
|
|
};
|
|
|
|
Channel.prototype.writeEvent = function (value) {
|
|
var channel = this;
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
var baseEvent = new WriteEvent(channel, wrapFunction, value);
|
|
baseEvents.push(baseEvent);
|
|
cont(null);
|
|
});
|
|
};
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// ReadEvent
|
|
|
|
function ReadEvent(channel, wrapFunction) {
|
|
this.channel = channel;
|
|
this.wrapFunction = wrapFunction;
|
|
}
|
|
|
|
ReadEvent.prototype.poll = function () {
|
|
return this.channel.tryRead();
|
|
};
|
|
|
|
ReadEvent.prototype.wait = function (cont) {
|
|
return this.channel.read(cont);
|
|
};
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// WriteEvent
|
|
|
|
function WriteEvent(channel, wrapFunction, value) {
|
|
this.channel = channel;
|
|
this.wrapFunction = wrapFunction;
|
|
this.value = value;
|
|
};
|
|
|
|
WriteEvent.prototype.poll = function () {
|
|
return this.channel.tryWrite(this.value) ? { value: null } : null;
|
|
};
|
|
|
|
WriteEvent.prototype.wait = function (cont) {
|
|
return this.channel.write(this.value, cont);
|
|
};
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Event
|
|
|
|
function Event(prepare) {
|
|
this.prepare = prepare;
|
|
}
|
|
|
|
Event.prototype.wrap = function (f) {
|
|
return wrap(this, f);
|
|
};
|
|
|
|
Event.prototype.wrapAbort = function (f) {
|
|
return wrapAbort(this, f);
|
|
};
|
|
|
|
Event.prototype.or = function (event) {
|
|
return chooseBinary(this, event);
|
|
};
|
|
|
|
Event.prototype.sync = function (cont) {
|
|
return sync(this, cont);
|
|
};
|
|
|
|
Event.prototype.timeout = function (ms) {
|
|
return timeout(this, ms);
|
|
};
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// always/never
|
|
|
|
function always(value) {
|
|
var channel = new Channel();
|
|
(function loop() {
|
|
channel.write(value, loop);
|
|
}());
|
|
return channel.readEvent();
|
|
}
|
|
|
|
function never() {
|
|
return new Channel().readEvent();
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// timeout
|
|
|
|
function timeout(event, ms) {
|
|
return guard(function (cont) {
|
|
var channel = new Channel();
|
|
var timeout = setTimeout(function () {
|
|
channel.writeEvent(true).sync(function () {
|
|
});
|
|
}, ms);
|
|
cont(null, choose([
|
|
event.wrap(function (value, cont) {
|
|
clearTimeout(timeout);
|
|
cont(null, { value: value });
|
|
}),
|
|
channel.readEvent().wrap(function (value, cont) {
|
|
cont(null, { timeout: true });
|
|
})
|
|
]).wrapAbort(function () {
|
|
clearTimeout(timeout);
|
|
}));
|
|
});
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// operators
|
|
|
|
function guard(f) {
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
f(function (error, event) {
|
|
if (error) {
|
|
cont(error);
|
|
} else {
|
|
event.prepare(baseEvents, wrapFunction, abortChannel, cont);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function wrap(event, f) {
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
event.prepare(baseEvents, function (value, cont) {
|
|
wrapFunction(value, function (error, value) {
|
|
if (error) {
|
|
cont(error);
|
|
} else {
|
|
f(value, cont);
|
|
}
|
|
})
|
|
}, abortChannel, cont);
|
|
});
|
|
}
|
|
|
|
function wrapAbort(event, f) {
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
var index = baseEvents.length;
|
|
event.prepare(baseEvents, wrapFunction, abortChannel, function (error) {
|
|
if (error) {
|
|
cont(error);
|
|
} else {
|
|
var childBaseEvents = baseEvents.slice(index);
|
|
abortChannel.read(function (baseEvent) {
|
|
if (childBaseEvents.indexOf(baseEvent) === -1) {
|
|
f();
|
|
}
|
|
});
|
|
cont(null);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function chooseBinary(e1, e2) {
|
|
return new Event(function (baseEvents, wrapFunction, abortChannel, cont) {
|
|
e1.prepare(baseEvents, wrapFunction, abortChannel, function (error) {
|
|
if (error) {
|
|
cont(error);
|
|
} else {
|
|
e2.prepare(baseEvents, wrapFunction, abortChannel, cont);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
function choose(events) {
|
|
return events.reduce(chooseBinary);
|
|
}
|
|
|
|
function select(events, cont) {
|
|
sync(choose(events), cont);
|
|
}
|
|
|
|
function range(a, b) {
|
|
var l = [];
|
|
for (var i = a; i <= b; i++) l.push(i);
|
|
return l;
|
|
}
|
|
|
|
function randomize(l) {
|
|
for (var i = 0; i < l.length; i++) {
|
|
var j = Math.floor(Math.random() * (i + 1));
|
|
var t = l[i];
|
|
l[i] = l[j];
|
|
l[j] = t;
|
|
}
|
|
}
|
|
|
|
function pollBaseEvents(baseEvents, cont) {
|
|
var cancelFunctions = [];
|
|
|
|
function ret(baseEvent, value) {
|
|
cancelFunctions.forEach(function (cancel) {
|
|
cancel();
|
|
});
|
|
cont(baseEvent, value);
|
|
}
|
|
|
|
baseEvents.forEach(function (baseEvent) {
|
|
var pollResult = baseEvent.poll();
|
|
if (pollResult !== null) {
|
|
ret(baseEvent, pollResult.value);
|
|
} else {
|
|
var cancel = baseEvent.wait(function (value) {
|
|
ret(baseEvent, value);
|
|
});
|
|
cancelFunctions.push(cancel);
|
|
}
|
|
});
|
|
}
|
|
|
|
function sync(event, cont) {
|
|
cont = cont || function () {
|
|
};
|
|
var baseEvents = [];
|
|
var wrapFunction = function (value, cont) {
|
|
cont(null, value);
|
|
};
|
|
var abortChannel = new Channel();
|
|
function ret(baseEvent, value) {
|
|
baseEvent.wrapFunction(value, cont);
|
|
(function spawnAbortFunction() {
|
|
abortChannel.write(baseEvent, spawnAbortFunction);
|
|
}());
|
|
}
|
|
function pollBaseEvents() {
|
|
(function loop(n) {
|
|
if(n === baseEvents.length) {
|
|
waitBaseEvents();
|
|
} else {
|
|
var baseEvent = baseEvents[n];
|
|
var result = baseEvent.poll();
|
|
if(result !== null) {
|
|
ret(baseEvent, result.value);
|
|
} else {
|
|
setImmediate(function() { loop(n + 1); });
|
|
}
|
|
}
|
|
})(0);
|
|
}
|
|
function waitBaseEvents() {
|
|
var cancelFunctions = [];
|
|
var done = false;
|
|
(function loop(n) {
|
|
if(!done && n !== baseEvents.length) {
|
|
var baseEvent = baseEvents[n];
|
|
var cancelFunction = baseEvent.wait(function (value) {
|
|
cancelFunctions.forEach(function (cancelFunction) {
|
|
cancelFunction();
|
|
});
|
|
done = true;
|
|
ret(baseEvent, value);
|
|
});
|
|
cancelFunctions.push(cancelFunction);
|
|
setImmediate(function() { loop(n + 1); });
|
|
}
|
|
})(0);
|
|
}
|
|
event.prepare(baseEvents, wrapFunction, abortChannel, function (error) {
|
|
if (error) return cont(error);
|
|
randomize(baseEvents);
|
|
pollBaseEvents();
|
|
});
|
|
}
|
|
|
|
function newChannel() {
|
|
return new Channel();
|
|
}
|
|
|
|
module.exports = {
|
|
Channel: Channel,
|
|
newChannel: newChannel,
|
|
guard: guard,
|
|
wrap: wrap,
|
|
wrapAbort: wrapAbort,
|
|
chooseBinary: chooseBinary,
|
|
choose: choose,
|
|
select: select,
|
|
sync: sync,
|
|
timeout: timeout,
|
|
never: never,
|
|
always: always
|
|
}; |