(util concurrent) - Concurrency utilities

Library (util concurrent)

This library provides high level concurrency APIs.

Using low level thread and mutex sometimes causes dead lock or incomprehensible code. This library provides frameworks of common use cases. This library is built on top of SRFI-18.

Future

A sub library of (util concurrent). This library provides future related APIs.

A future is an object that has a task which is a thunk and will be executed in future. In this implementation, future is an interface and its execution is depending on the sub class. The default implementation this library provides uses a thread per future. If users don't have to manage number of threads, then using this is sufficient.

(import (rnrs) (util concurrent))

;; creates 5 futures
(define futures
  (map (lambda (i) (future (* i i))) '(1 2 3 4 5)))

;; wait and retrieve the results
(map future-get futures)
(1 4 9 16 25)

Record <future>

The interface of all futures.

Function future? obj

Returns #t if given obj is a future object, otherwise #f.

Creates a future which executes expr. The type of the returning future is record.

The first form is equivalent with the following: (future (class <simple-future>) _expr ..._)

Retrieves the result of the given future.

This procedure waits if the execution of the future isn't finished yet.

Cancels the execution of future.

This procedure may or may not cancel the execution depending on the implementation of future. The <simple-future> provides by this library won't disturb the execution. Thus calling this procedure doesn't do anything but changing the future's state.

NOTE: once this procedure is called, then calling future-getwith future raises a &future-terminated.

(import (rnrs) (util concurrent))

(define f (future (display "cancelled") (newline)))
(future-cancel f)
(future-get f)
&future-terminated

The above example may or may not print "cancelled".

Returns #t if given execution of future is finished, otherwise #f.

Returns #t if given execution of future is terminated by future-cancel, otherwise #f.

Returns current state of the given future.

The returning state is depending on the implementation of future. Only 2 states are defined in this library, done and terminated.

done is set when future-get is called.

terminated is set when future-cancel is called.

Condition Type &future-terminated

This type describes when a future is terminated but users try to retrieve its result.

Returns #t if given obj is object of &future-terminated.

condition must be a &future-terminated condition.

Retrieve terminated future from condition.

Simple future

Simple future is a future implementation which executes the task on a thread immediately.

Default <future> implementation of this library.

Returns #t if given obj is a simple future, otherwise #f.

Creates a simple future which executes thunk.

Executor

A sub library of (util concurrent). This library provides executor related APIs.

A executor is an object that executes submitted futures. The idea is taken from java.util.concurrent package. The library provides 2 types of executors, thread pool executor and fork join executor. The first one uses thread pool, described below section, and the latter one just creates a thread per task. The following is an example how to use the executors:

(import (rnrs) (util concurrent))

;; creates executor which uses 5 threads and push all tasks
(define executor 
  (make-thread-pool-executor 5 push-future-handler))

;; creates 10 futures
(define futures 
  (map (lambda (i) 
         (future (class <executor-future>)
           (* i i)))
       '(1 2 3 4 5 6 7 8 9 10)))

;; execute futures
(for-each (lambda (future) (execute-future! executor future)) futures)

;; wait/retrieve the results
(map future-get futures)
(1 4 9 16 25 36 49 64 81 100)

The thread pool executor with push-future-handler waits until the previous taskes are finished.

Generic Executor APIs

Executor provided by this library is an extensible. So the most commonly used procedures are generic.

The interface of executor.

This record only has one field, state.

Returns #t if given obj is an executor, otherwise #f.

Returns state field of the executor.

The behaviour of the folowing procedures depend on its implementation.

Returns #t if the given executor is available, otherwise #f.

Executes given future on executor.

Shutdowns the given executor.

This procedure may or may not affect the managed futures on the executor.

Converts thunk to a future and execute it on given executor, then returns the future. This procedure is defined as follows:

(define (executor-submit! e thunk)
  (let ((f (make-executor-future thunk)))
    (execute-future! e f)
    f))

Thread pool executor

Thread pool executor uses (util concurrent thread-pool) as its underlying thread managing. So once the threads are created then the thread holds its environment until the executor is shutdown. In other words, if a task changes the dynamic environment, then the next task uses the changed dynamic environment. The following example describes how dynamic environments works on this executor:

(import (rnrs) (util concurrent) (srfi :39))

(define *one* (make-parameter 1))

(define executor (make-thread-pool-executor 1))

(let ((f1 (make-executor-future (lambda () (*one* 2) 1)))
      (f2 (make-executor-future (lambda () (*one*)))))
  (execute-future! executor f1)
  (future-get f1)
  (execute-future! executor f2)
  (future-get f2))
2

NOTE: parameter objects are thread safe in general, thus if a thread is created per future, then the parameter *one* is initialised with the initial value 1 during thread creation.

Record type of thread pool executor. This record type inherits <executor>.

Returns #t if given obj is a thread pool executor, otherwise #f.

Creates a thread pool executor with thread count max-thread.

If optional argument reject-handler is specified, then the specified handler is used. Otherwise, abort-rejected-handler is used.

Returns number of futures currently executing on the given thread pool executor.

This number would be greater than the thread count if push-future-handleris specified during the executor creation.

Returns number of thread count of the given thread pool executor.

Return #t if the number of executing future is less than the number of thread count.

NOTE: this procedure may return #f even tasks can be pushed to the executor if push-future-handler is specified.

Executes the given future on thread-pool-executor.

Shutdown the given thread-pool-executor.

Builtin reject handlers.

Reject handler is a procedure called when thread pool executor is not available to decide how the executor should treat the given future.

Reject the future and raises &rejected-execution-error.

This is the default handler.

Terminates the oldest future.

When this handler is called, the thread which target future is running is also terminated. Thus the dynamic environment is also reset.

Creats a reject handler which waits until one of the thread is available.

The wait-retry is a number of retry count. If none of future task is finished by this counter, then abort-rejected-handler is called.

Pushes the task to the least used thread.

A condition describes when a future is rejected by an executor.

Returns #t if given obj is &rejected-execution-error object. Otherwise #f.

condition must be a &rejected-execution-error object.

Retrieves the rejected future and executor, respectively.

Fork join executor

Fork join executor is an executor which uses fork join pool as its underlying thread pool.

Record type of fork join executor. This record type inherits <executor>.

Returns #t if given obj is a fork join executor, otherwise #f.

Creates a fork join executor.

If the second form is used, then it uses the given parallelism as its parallelism.

If the third form is used, then the parameter must be fork-join-pool-parameter described in below section and the procedure passes the given parameter to fork join thread pool creation.

Returns #t, if the underlying thread pool is not shutdown, otherwise #f.

Executes the given future on fork-join-executor.

Shutdowns the given fork-join-executor. The procedure also shutdowns the underlying fork join pool.

Executor future

An executor future is an future object which can be used on executor.

Record type of <executor-future>. This type inherits <future>.

Returns #t if the given obj is an executor future, otherwise #f.

Creates an executor future object.

Thread pool

A sub library of (util concurrent). This library provides thread pool APIs.

Creating a thread is not cheap on Sagittarius. If users want to reuse threads, then this library can be used.

(import (rnrs) (util concurrent))

;; pooling 5 thread
(define thread-pool (make-thread-pool 5))

(for-each (lambda (i) (thread-pool-push-task! thread-pool (lambda () (* i i))))
          '(1 2 3 4 5 6 7 8 9 10))

;; waits until all tasks are done
(thread-pool-wait-all! thread-pool)

;; release thread-pool
(thread-pool-release! thread-pool)
Record type <thread-pool>

Record type of thread pool.

Returns #t if given obj is a thread-pool, otherwise #f.

Creates a thread pool with thread-count of threads.

If the optional argument error-handler is given, it must be a procedure which accept one argument, then the procedure is called when the pushed task raised an error.

Returns number of threads on the given thread-pool.

Push the given thunk to least used thread-pool's thread. And returns the id of the pushed thread. This id can be used to retrive the actual thread calling thread-pool-thread procedure.

Waits all the tasks pushed into the given thread-pool.

The return value of the tasks are discarded.

Joins all the thread on the given thread-pool.

If optional argument how is specified terminate, then the procedure terminates the thread instead of joining.

NOTE: terminating a thread is very dangerous operation, so don't use casually.

Retrieves the pooled thread associated with given id from given thread-pool.

It signals an error if the given id is not a thread id.

Retrieves the pooled thread id associated with given thread from given thread-pool. The procedure takes O(n) where n is number of threads managed by the thread-pool. It might be better to use (thread-pool-current-thread-id) procedure to retrieve thread id from managed threads.

It signals an error if the given thread is not a managed thread.

NOTE: if the thread is terminated, then the procedure also signals an error.

Retrieves thread id of current thread. If the current thread is not a managed thread, then #f is returned.

Terminates the pooled thread associated with given id and recreate a new thread into thread-pool.

NTOE: this is a dangerous operation. Don't use it casually.

Returns #t if the given id of thread in the thread-pool is running. Otherwise #f.

A sub library of (util concurrent). This library provides fork join pool APIs.

On Sagittarius, fork join pool means work stealing pool. The pool takes core number of threads and it may creates ephemeral threads until it reaches the max thread number.

CAVEAT: The implementation increases threads rather quick, which means it reaches the max thread number very easily if the thread pool receives large number of tasks. This behaviour may change in the near future not to make threads too soon.

Record type <fork-join-pool>

Record type of fork join pool.

Returns #t if the given obj is a fork-join-pool, otherwise #f.

Creates a fork join pool with core thread count of core-threads.

If the optional argument parameter is given, then it must be a fork join pool parameter. The parameter controls creating fork join pool.

Returns number of threads currently the given fork-join-pool is having.

Returns max thread number of the given fork-join-pool.

Pushes the given thunk into the fork-join-pool. The thunk will be executed on the fork-join-pool when there's an available thread.

Waits fork-join-pool to finish all tasks. The procedure blocks the calling thread and may not return if there's a task which hanged.

Optional argument timeout specifies the timeout. It can be either an integer represents milliseconds or absolute time.

The procedure returns #t if all the core threads are freed. otherwise #f. (e.g. timeout)

NOTE: At this moment, this procedure doesn't guarantee the tasks are finished, if it's running on a spawned thread.

Shutdowns the given fork-join-pool.
This procedure discards all the threads. After this procedure is called, then the given fork-join-pool is no longer available.

Returns #t if the given fork-join-pool is available.

Shared queues

A sub library of (util concurrent). This library provides shared queue APIs.

A shared queue is a queue whose operations are done atomically.

(import (rnrs) (util concurrent) (srfi :18))

(define shared-queue (make-shared-queue))

(define thread 
  (thread-start!
    (make-thread
      (lambda ()
        ;; waits until the queue has an element
        (let ((value (shared-queue-get! shared-queue)))
          (* value value))))))

(shared-queue-put! share-queue 5)

(thread-join! thread)
25

Record type of shared queue.

Returns #t if given obj is shared queue, otherwise #f.

Creates a shared queue.

If optional argument max-length is 0, then the queue can be used as synchronised queue. If the value is positive number, then the queue can only have specified number of elements. If it overflows, then it waits until the number of elements is less than max-length.

Returns #t if the number of elements inside of shared-queue is 0. Otherwise #f.

Returns the number of elements inside of shared-queue.

Returns max length of shared-queue.

Returns #t if putting count of element into _shared-queue_overflows. Otherwise #f.

Retrieves the first element from shared-queue.

If the queue is empty and optional argument timeout is #f, then this procedure waits until the queue gets something.

If the optional argument timeout is specified, then the procedure only waits specified amount of time. timeout can be either integer or time object defined in SRFI-19. If the queue didn't get any object within the timeout, then timeout-val is returned.

Puts obj into shared-queue and returns obj.

If the queue has max-length and overflows, then it wait until it's available.

If the optional argument timeout is specified, then the procedure only waits specified amount of time. timeout can be either integer or time object defined in SRFI-19. If the queue didn't get any object within the timeout, then timeout-val is returned.

Removes the given obj from the shared-queue. The procedure returns #t if obj is removed, otherwise #f.

Optional argument =, must be a comparison procedure, specifies how to compare the element of the shared-queue and given obj. Default value is equal?.

Clears all element inside of shared-queue.

Finds an elements which satisfies pred. This operations locks the given shared-queue.

Returns #t if the given shared-queue is locked by other thread, otherwise #f.

If the optional argument wait? is given, then the procedure waits until the queue is available and returns #f.

Actor

A sub library of (util concurrent). This library provides actor model like APIs.

An actor is an object which contains thread, input receiver and output sender. This is based on the Actor model. Communication between an actor and outside of the actor can only be done via input receiver or output sender. From here, we call them channel. The following is a simple bank account example using actor.

(import (rnrs) (util concurrent actor) (match))

(define (open-account initial-amount)
  (define balance initial-amount)
  (make-shared-queue-channel-actor
   (lambda (input-receiver output-sender)
     (let loop ()
       (match (input-receiver)
	 (('withdrow how-much)
	  (if (< balance how-much)
	      (output-sender "invalid amount")
	      (begin
		(set! balance (- balance how-much))
		(output-sender (cons how-much balance))))
	  (loop))
	 (('deposite a)
	  (if (negative? a)
	      (output-sender "invalid amount")
	      (begin
		(set! balance (+ balance a))
		(output-sender (cons 0 balance))))
	  (loop))
	 (('close) #t)
	 (else "invalid message"))))))

(define client (open-account 1000))

(actor-send-message! client '(withdrow 100))
(actor-send-message! client '(deposite 100))
(actor-send-message! client '(close))

(actor-receive-message! client) ;; => (100 . 900)
(actor-receive-message! client) ;; => (0 . 1000)
Function actor? obj

Returns #t if the given obj is an actor, otherwise #f.

Creates actors with shared-queue or shared-priority-queue as underlying channel implementation, respectively.

If the make-shared-priority-queue-channel-actor is used, then the compare must be a procedure which takes 2 arguments and returns the comparison result of given 2 arguments. The value should be, -1, 0 or 1.

task must be an procedure accepts 2 argument, input-receiver and output-sender. The procedures' signatures are the followings:

The input-receiver receives a message from outside of the actor.

The output-sender sends a message message to outside of the actor.

Messages can be sent to an actor via actor-send-message!, and be received from an actor via actor-receive-message!.

The optional arguments timeout and timeout-val are given, it shall behave the same as shared-queue-get! or shared-queue-put!.

Sends the given message to the actor. The operation may block the caller thread depending on the underlying channel implementation.

If optional argument timeout and timeout-val are given, it shall behave the same as shared-queue-put!.

Receives a message from given actor. The operation may block the caller thread depending on the underlying channel implementation.

If optional argument timeout and timeout-val are given, it shall behave the same as shared-queue-get!.

Return #t if the given actor is running, otherwise #f.

Waits until the given actor is finished.

The optional arguments works the same as thread-join!.

Terminates the given actor.

NOTE: This operation is not safe. It is users' responsibility to release resource if it's needed.

A sub library of (util concurrent). This library provides Java's CompletableFuture like interface

Provide a future whose value supplier is the thunk.

If the second form is used, then the execution will be done by the given executor otherwise *completable-future:default-executor*will be used.

proc must accept the same number of arguments as the given futures

Apply the procedure proc to the result of the futures. And return a newly created future.

These procedures return immediately and the computation of proc will be done in some future.

The same as future-map, the only diffrence is that it takes exeuctor as its execution environment.

proc must accept the same number of arguments as the given futures, and return a future.

Apply the procedure proc to the result of the futures. And return a newly created future which returns the result of the future returned by the proc.

These procedures return immediately and the computation of proc will be done in some future.

The same as future-flatmap, the only diffrence is that it takes exeuctor as its execution environment.

Guards the future and apply the raised condition to proc.

(future-get (future-guard (lambda (e) 'ok)
                          (thunk->future (lambda () (raise 'boo)))))
'ok

These procedures return immediately and the computation of proc will be done in some future.