This library provides high level concurrency APIs.
Using low level thread and mutex sometimes causes dead lock or incomprehensible code. This library provides frameworks of common use cases. This library is built on top of SRFI-18.
A sub library of (util concurrent)
. This library provides
future related APIs.
A future is an object that has a task which is a thunk and will be executed in future. In this implementation, future is an interface and its execution is depending on the sub class. The default implementation this library provides uses a thread per future. If users don't have to manage number of threads, then using this is sufficient.
(import (rnrs) (util concurrent))
;; creates 5 futures
(define futures
(map (lambda (i) (future (* i i))) '(1 2 3 4 5)))
;; wait and retrieve the results
(map future-get futures)
(1 4 9 16 25)
The interface of all futures.
Returns #t if given obj is a future object, otherwise #f.
Creates a future which executes expr. The type of the returning future is record.
The first form is equivalent with the following:
(future (class <simple-future>) _expr ..._)
Retrieves the result of the given future.
This procedure waits if the execution of the future isn't finished yet.
Cancels the execution of future.
This procedure may or may not cancel the execution depending on the
implementation of future. The <simple-future>
provides by
this library won't disturb the execution. Thus calling this procedure
doesn't do anything but changing the future's state.
NOTE: once this procedure is called, then calling future-get
with future raises a &future-terminated
.
(import (rnrs) (util concurrent))
(define f (future (display "cancelled") (newline)))
(future-cancel f)
(future-get f)
&future-terminated
The above example may or may not print "cancelled"
.
Returns #t if given execution of future is finished, otherwise #f.
Returns #t if given execution of future is terminated by
future-cancel
, otherwise #f.
Returns current state of the given future.
The returning state is depending on the implementation of future.
Only 2 states are defined in this library, done
and terminated
.
done
is set when future-get
is called.
terminated
is set when future-cancel
is called.
This type describes when a future is terminated but users try to retrieve its result.
Returns #t if given obj is object of &future-terminated
.
condition must be a &future-terminated
condition.
Retrieve terminated future from condition.
Simple future is a future implementation which executes the task on a thread immediately.
Default <future>
implementation of this library.
Returns #t if given obj is a simple future, otherwise #f.
Creates a simple future which executes thunk.
A sub library of (util concurrent)
. This library provides
executor related APIs.
A executor is an object that executes submitted futures. The idea is taken
from java.util.concurrent
package. The library provides 2 types
of executors, thread pool executor and fork join executor. The first
one uses thread pool, described below section, and the latter one
just creates a thread per task. The following is an example how to use
the executors:
(import (rnrs) (util concurrent))
;; creates executor which uses 5 threads and push all tasks
(define executor
(make-thread-pool-executor 5 push-future-handler))
;; creates 10 futures
(define futures
(map (lambda (i)
(future (class <executor-future>)
(* i i)))
'(1 2 3 4 5 6 7 8 9 10)))
;; execute futures
(for-each (lambda (future) (execute-future! executor future)) futures)
;; wait/retrieve the results
(map future-get futures)
(1 4 9 16 25 36 49 64 81 100)
The thread pool executor with push-future-handler
waits until the
previous taskes are finished.
Executor provided by this library is an extensible. So the most commonly used procedures are generic.
The interface of executor.
This record only has one field, state
.
Returns #t if given obj is an executor, otherwise #f.
Returns state
field of the executor.
The behaviour of the folowing procedures depend on its implementation.
Returns #t if the given executor is available, otherwise #f.
Executes given future on executor.
Shutdowns the given executor.
This procedure may or may not affect the managed futures on the executor.
Converts thunk to a future and execute it on given executor, then returns the future. This procedure is defined as follows:
(define (executor-submit! e thunk)
(let ((f (make-executor-future thunk)))
(execute-future! e f)
f))
Thread pool executor uses (util concurrent thread-pool)
as its
underlying thread managing. So once the threads are created then the
thread holds its environment until the executor is shutdown. In other
words, if a task changes the dynamic environment, then the next task
uses the changed dynamic environment. The following example describes
how dynamic environments works on this executor:
(import (rnrs) (util concurrent) (srfi :39))
(define *one* (make-parameter 1))
(define executor (make-thread-pool-executor 1))
(let ((f1 (make-executor-future (lambda () (*one* 2) 1)))
(f2 (make-executor-future (lambda () (*one*)))))
(execute-future! executor f1)
(future-get f1)
(execute-future! executor f2)
(future-get f2))
2
NOTE: parameter objects are thread safe in general, thus if a thread is
created per future, then the parameter *one*
is initialised with
the initial value 1
during thread creation.
Record type of thread pool executor. This record type inherits
<executor>
.
Returns #t if given obj is a thread pool executor, otherwise #f.
Creates a thread pool executor with thread count max-thread.
If optional argument reject-handler is specified, then the specified
handler is used. Otherwise, abort-rejected-handler
is used.
Returns number of futures currently executing on the given thread pool executor.
This number would be greater than the thread count if push-future-handler
is specified during the executor creation.
Returns number of thread count of the given thread pool executor.
Return #t if the number of executing future is less than the number of thread count.
NOTE: this procedure may return #f even tasks can be pushed to the executor
if push-future-handler
is specified.
Executes the given future on thread-pool-executor.
Shutdown the given thread-pool-executor.
Builtin reject handlers.
Reject handler is a procedure called when thread pool executor is not available to decide how the executor should treat the given future.
Reject the future and raises &rejected-execution-error
.
This is the default handler.
Terminates the oldest future.
When this handler is called, the thread which target future is running is also terminated. Thus the dynamic environment is also reset.
Creats a reject handler which waits until one of the thread is available.
The wait-retry is a number of retry count. If none of future task
is finished by this counter, then abort-rejected-handler
is called.
Pushes the task to the least used thread.
A condition describes when a future is rejected by an executor.
Returns #t if given obj is &rejected-execution-error
object.
Otherwise #f.
condition must be a &rejected-execution-error
object.
Retrieves the rejected future and executor, respectively.
Fork join executor is an executor which uses fork join pool as its underlying thread pool.
Record type of fork join executor. This record type inherits
<executor>
.
Returns #t if given obj is a fork join executor, otherwise #f.
Creates a fork join executor.
If the second form is used, then it uses the given parallelism as its parallelism.
If the third form is used, then the parameter must be
fork-join-pool-parameter
described in below section and the
procedure passes the given parameter to fork join thread pool
creation.
Returns #t
, if the underlying thread pool is not shutdown, otherwise #f
.
Executes the given future on fork-join-executor.
Shutdowns the given fork-join-executor. The procedure also shutdowns the underlying fork join pool.
An executor future is an future object which can be used on executor.
Record type of <executor-future>
. This type inherits
<future>
.
Returns #t if the given obj is an executor future, otherwise #f.
Creates an executor future object.
A sub library of (util concurrent)
. This library provides
thread pool APIs.
Creating a thread is not cheap on Sagittarius. If users want to reuse threads, then this library can be used.
(import (rnrs) (util concurrent))
;; pooling 5 thread
(define thread-pool (make-thread-pool 5))
(for-each (lambda (i) (thread-pool-push-task! thread-pool (lambda () (* i i))))
'(1 2 3 4 5 6 7 8 9 10))
;; waits until all tasks are done
(thread-pool-wait-all! thread-pool)
;; release thread-pool
(thread-pool-release! thread-pool)
Record type of thread pool.
Returns #t
if given obj is a thread-pool, otherwise #f
.
Creates a thread pool with thread-count of threads.
If the optional argument error-handler is given, it must be a procedure which accept one argument, then the procedure is called when the pushed task raised an error.
Returns number of threads on the given thread-pool.
Push the given thunk to least used thread-pool's thread.
And returns the id of the pushed thread. This id can be used to retrive
the actual thread calling thread-pool-thread
procedure.
Waits all the tasks pushed into the given thread-pool.
The return value of the tasks are discarded.
Joins all the thread on the given thread-pool.
If optional argument how is specified terminate
, then the procedure
terminates the thread instead of joining.
NOTE: terminating a thread is very dangerous operation, so don't use casually.
Retrieves the pooled thread associated with given id from given thread-pool.
It signals an error if the given id is not a thread id.
Retrieves the pooled thread id associated with given thread from
given thread-pool. The procedure takes O(n) where n is number of threads
managed by the thread-pool. It might be better to use
(thread-pool-current-thread-id)
procedure to retrieve thread id from
managed threads.
It signals an error if the given thread is not a managed thread.
NOTE: if the thread is terminated, then the procedure also signals an error.
Retrieves thread id of current thread. If the current thread is not a managed thread, then #f is returned.
Terminates the pooled thread associated with given id and recreate a new thread into thread-pool.
NTOE: this is a dangerous operation. Don't use it casually.
Returns #t if the given id of thread in the thread-pool is running. Otherwise #f.
A sub library of (util concurrent)
. This library provides
fork join pool APIs.
On Sagittarius, fork join pool means work stealing pool. The pool takes core number of threads and it may creates ephemeral threads until it reaches the max thread number.
CAVEAT: The implementation increases threads rather quick, which means it reaches the max thread number very easily if the thread pool receives large number of tasks. This behaviour may change in the near future not to make threads too soon.
Record type of fork join pool.
Returns #t
if the given obj is a fork-join-pool, otherwise #f
.
Creates a fork join pool with core thread count of core-threads.
If the optional argument parameter is given, then it must be a fork join pool parameter. The parameter controls creating fork join pool.
Returns number of threads currently the given fork-join-pool is having.
Returns max thread number of the given fork-join-pool.
Pushes the given thunk into the fork-join-pool. The thunk will be executed on the fork-join-pool when there's an available thread.
Waits fork-join-pool to finish all tasks. The procedure blocks the calling thread and may not return if there's a task which hanged.
Optional argument timeout specifies the timeout. It can be either an integer represents milliseconds or absolute time.
The procedure returns #t
if all the core threads are freed.
otherwise #f
. (e.g. timeout)
NOTE: At this moment, this procedure doesn't guarantee the tasks are finished, if it's running on a spawned thread.
Shutdowns the given fork-join-pool.
This procedure discards all the threads. After this procedure is called,
then the given fork-join-pool is no longer available.
Returns #t
if the given fork-join-pool is available.
A sub library of (util concurrent)
. This library provides
shared queue APIs.
A shared queue is a queue whose operations are done atomically.
(import (rnrs) (util concurrent) (srfi :18))
(define shared-queue (make-shared-queue))
(define thread
(thread-start!
(make-thread
(lambda ()
;; waits until the queue has an element
(let ((value (shared-queue-get! shared-queue)))
(* value value))))))
(shared-queue-put! share-queue 5)
(thread-join! thread)
25
Record type of shared queue.
Returns #t if given obj is shared queue, otherwise #f.
Creates a shared queue.
If optional argument max-length is 0, then the queue can be used as synchronised queue. If the value is positive number, then the queue can only have specified number of elements. If it overflows, then it waits until the number of elements is less than max-length.
Returns #t if the number of elements inside of shared-queue is 0. Otherwise #f.
Returns the number of elements inside of shared-queue.
Returns max length of shared-queue.
Returns #t if putting count of element into _shared-queue_overflows. Otherwise #f.
Retrieves the first element from shared-queue.
If the queue is empty and optional argument timeout is #f, then this procedure waits until the queue gets something.
If the optional argument timeout is specified, then the procedure only waits specified amount of time. timeout can be either integer or time object defined in SRFI-19. If the queue didn't get any object within the timeout, then timeout-val is returned.
Puts obj into shared-queue and returns obj.
If the queue has max-length
and overflows, then it wait until
it's available.
If the optional argument timeout is specified, then the procedure only waits specified amount of time. timeout can be either integer or time object defined in SRFI-19. If the queue didn't get any object within the timeout, then timeout-val is returned.
Removes the given obj from the shared-queue. The procedure returns #t if obj is removed, otherwise #f.
Optional argument =, must be a comparison procedure, specifies how to
compare the element of the shared-queue and given obj. Default
value is equal?
.
Clears all element inside of shared-queue.
Finds an elements which satisfies pred. This operations locks the given shared-queue.
Returns #t if the given shared-queue is locked by other thread, otherwise #f.
If the optional argument wait? is given, then the procedure waits until the queue is available and returns #f.
A sub library of (util concurrent)
. This library provides
actor model like APIs.
An actor is an object which contains thread, input receiver and output sender. This is based on the Actor model. Communication between an actor and outside of the actor can only be done via input receiver or output sender. From here, we call them channel. The following is a simple bank account example using actor.
(import (rnrs) (util concurrent actor) (match))
(define (open-account initial-amount)
(define balance initial-amount)
(make-shared-queue-channel-actor
(lambda (input-receiver output-sender)
(let loop ()
(match (input-receiver)
(('withdrow how-much)
(if (< balance how-much)
(output-sender "invalid amount")
(begin
(set! balance (- balance how-much))
(output-sender (cons how-much balance))))
(loop))
(('deposite a)
(if (negative? a)
(output-sender "invalid amount")
(begin
(set! balance (+ balance a))
(output-sender (cons 0 balance))))
(loop))
(('close) #t)
(else "invalid message"))))))
(define client (open-account 1000))
(actor-send-message! client '(withdrow 100))
(actor-send-message! client '(deposite 100))
(actor-send-message! client '(close))
(actor-receive-message! client) ;; => (100 . 900)
(actor-receive-message! client) ;; => (0 . 1000)
Returns #t if the given obj is an actor, otherwise #f.
Creates actors with shared-queue or shared-priority-queue as underlying channel implementation, respectively.
If the make-shared-priority-queue-channel-actor
is used, then the
compare must be a procedure which takes 2 arguments and returns the
comparison result of given 2 arguments. The value should be, -1, 0 or 1.
task must be an procedure accepts 2 argument, input-receiver and output-sender. The procedures' signatures are the followings:
The input-receiver receives a message from outside of the actor.
The output-sender sends a message message to outside of the actor.
Messages can be sent to an actor via actor-send-message!
, and be
received from an actor via actor-receive-message!
.
The optional arguments timeout and timeout-val are given, it shall
behave the same as shared-queue-get!
or shared-queue-put!
.
Sends the given message to the actor. The operation may block the caller thread depending on the underlying channel implementation.
If optional argument timeout and timeout-val are given, it shall
behave the same as shared-queue-put!
.
Receives a message from given actor. The operation may block the caller thread depending on the underlying channel implementation.
If optional argument timeout and timeout-val are given, it shall
behave the same as shared-queue-get!
.
Return #t if the given actor is running, otherwise #f.
Waits until the given actor is finished.
The optional arguments works the same as thread-join!
.
Terminates the given actor.
NOTE: This operation is not safe. It is users' responsibility to release resource if it's needed.
A sub library of (util concurrent)
. This library provides
Java's CompletableFuture like interface
Provide a future whose value supplier is the thunk.
If the second form is used, then the execution will be done by the
given executor otherwise *completable-future:default-executor*
will be used.
proc must accept the same number of arguments as the given futures
Apply the procedure proc to the result of the futures. And return a newly created future.
These procedures return immediately and the computation of proc will be done in some future.
The same as future-map
, the only diffrence is that it takes
exeuctor as its execution environment.
proc must accept the same number of arguments as the given futures, and return a future.
Apply the procedure proc to the result of the futures. And return a newly created future which returns the result of the future returned by the proc.
These procedures return immediately and the computation of proc will be done in some future.
The same as future-flatmap
, the only diffrence is that it takes
exeuctor as its execution environment.
Guards the future and apply the raised condition to proc.
(future-get (future-guard (lambda (e) 'ok)
(thunk->future (lambda () (raise 'boo)))))
'ok
These procedures return immediately and the computation of proc will be done in some future.