sketch of Async/Await semantics (Concurrency/Parallelism: Part 2) #3034
Replies: 5 comments 13 replies
-
As an aside, I tried an implementation of queues using |
Beta Was this translation helpful? Give feedback.
-
Awesome, thank you a lot for working on this! Regarding these points: So you really want to be invoking it as reset(Goal, Val, cont(Cont)) rather than reset(Goal, Val, Cont)
So, we cannot in general assume that ?- reset(true, B, Cont). Cont = none. The good news is that the representation of possible kinds of results is clean (as opposed to defaulty), and we can therefore for instance write: process_cont(none). process_cont(cont(C)) :- C. This was the motivation behind the API used in Scryer, also encouraged by @samer-- when the API of the original library was discussed. Please also see #136. One other point raised by @samer-- back then is that |
Beta Was this translation helpful? Give feedback.
-
I finally figured out how to use is_queue(q(0,_,_)).
is_queue(q(_N,_,_)).
?- queue(Q), is_queue(Q).
%@ Q = q(0,_A,_A)
%@ ; Q = q(0,_A,_A). This is obviously problematic. This made me realize why they don't use integers for the first argument of This allows for: is_queue(q(0,_,_)).
is_queue(q(s(N),_,_)).
?- queue(Q), is_queue(Q).
%@ Q = q(0,_A,_A)
%@ ; false. Which unfortunately still results in a redunant choice point. Fortunately, I remembered @triska had a video on this (that I thought I would never use) and it came in very handy! is_queue(q(N,A,B)) :-
is_queue_(N,A,B).
is_queue_(0,_,_).
is_queue_(s(_),_,_).
?- queue(Q), is_queue(Q).
%@ Q = q(0,_A,_A). 🥳 🥳 This makes the code MUCH cleaner! run_task_q(q(N,A,B)) :-
run_task_q_(N,A,B).
run_task_q_(s(N),A,B) :-
taskqcall_deque(q(s(N),A,B), Nq),
run_task_q(Nq).
run_task_q_(0,_,_). and for Before: taskqcall_deque(Q, Nq) :-
que_item_popped(Q,T,Q0),
reset_t(T,T1,cont(Cont),More),
if_(More=true,
( que_task_enque(Q0,T1,Nq0),
resume(Cont,Nq0,Nq)
),
Q0=Nq
).
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Event Loop Helpers
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
reset_t(Goal, Val, Cont, T) :-
( reset(Goal,Val,Cont)
-> T=true
; T=false
).
resume(Cont,Q,Nq) :-
if_(Cont=true,
Q=Nq,
que_task_enque(Q,resume(Cont),Nq)
).
resume(C) :- C. After: taskqcall_deque(Q, Nq) :-
que_item_popped(Q,T,Q0),
handle_await(T,T1,Cont),
resume_internal(Cont,T1,Q0,Nq).
resume_internal(none,_Meaningless,Q,Q).
resume_internal(cont(Cont),Task,Q,Nq) :-
resume_internal_(Cont,Task,Q,Nq).
resume_internal_(true,Task,Q,Nq) :-
que_task_enque(Q,Task,Nq).
resume_internal_(cont:call_continuation(Args),Task,Q,Nq) :-
que_task_enque(Q,Task,Nq0),
que_task_enque(Nq0,cont:call_continuation(Args),Nq). 🥳 🥳 🥳 I am extremely pleased with this refactor, thank you for the great tutorial on this topic, @triska ! |
Beta Was this translation helpful? Give feedback.
-
An interesting application could be asynchronous read and write of files, which Windows has had since about 1992. Linux and perhaps other Unix like OS more recently. |
Beta Was this translation helpful? Give feedback.
-
I've recently also wrote a small program that speaks binary protocol via 2 pipes and I solved it by using this predicate: get_bytes(Stream, B, A) :-
get_byte(Stream, Byte), Byte >= 0, A = [+Byte|X], (X = B; get_bytes(Stream, B, X)). It is hard to do it as DCG, but nevertheless you can plug it into any DCG rule and bytes will "magically" appear one-by-one for parsing. The rule can look like: message_exchange --> get_bytes(read_pipe), request(RequestedData), response(RequestedData), put_bytes(write_pipe). Please note that input bytes have It isn't as fast as your async library, because it reads only 1 byte at a time and does a lot of backtracking, so waiting for new developments. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Update:
library(async)
PR now available!Edit: significantly cleaned up file, fixed queue implementation, fixed late binding footgun
Current Version
Yesterday, I showed some interesting concurrency semantics I stumbled across. However, there were a few issues that didn't sit well with me.
I realized that was ok for certain kinds of workflows but I was interested in a workflow that would print
which is much more in keeping with standard coroutine control flow.
After some stumbling around, I realized what I really needed was continuations. I had never really had a strong usecase for them before, and I could never quite figure out
library(cont)
-- but now I finally did. Now that I understand it, I actually like it better than the semantics in the paper!In a nutshell, it works like this:
Some important caveats -- the third argument of
reset
is a functorcont/1
. So you really want to be invoking it asreset(Goal, Val, cont(Cont))
rather thanreset(Goal, Val, Cont)
-- this was really annoying to figure out. If you invoke it asreset(Goal, Val, cont(Cont))
, thenCont
will be a goal that when invoked will backtrack to where the correspondingshift/1
was called from! As you can see, you can REPEAT the computation multiple times if desired -- VERY handy!So, using this, I changed the event loop from
to a slightly more complex (sorry)
Auxiliary Functions for the above snippet
The purpose of this is to allow ANY goal to be run with
shift/1
without needing to thread the queue arguments!So rather than needing to write
NOW you can write
Which is significantly more elegant (in my opinion). As a bonus, you get the desired output of
This still allows for parallelism, but with a much cleaner API:
Unfortunately we are missing an important tool for streaming process communication. It is very handy that
process_wait/3
has atimeout/1
option, but most blocking IO operations do not have something like that. This would make it very difficult to achieve the effect of "busy polling streams" for content without a LOT of gymnastics.Overall it mostly works very well -- there are a few footguns. For example:
So ironically, this works fine:
but if I change the definition of
random_countable_task/0
tothe query does
not(Edit: fixed) terminate. I am not sure why this is happening yet, but it's unexpected.Anyway, here is the
full file(Edit: old version) if you want to play with it:Original Version
Beta Was this translation helpful? Give feedback.
All reactions