(* buffer.sml *) (* CML buffers of fixed maximum length, programmed without selective communication *) structure Buffer :> BUFFER = struct open CML (* datatype of requests that server can handle *) datatype 'a request = Del of 'a chan (* delete request *) | Add of 'a * unit chan (* add request *) (* buffers *) type 'a buffer = 'a request chan (* val server : int * 'a buffer -> 'b *) fun server(max, reqCh) = let (* val loop : 'a Fifo.fifo * 'a chan Fifo.fifo * ('a * unit chan)Fifo.fifo -> 'b we give highest precedence to pending requests Fifo.length que is bounded above by max (Fifo.length recvs + Fifo.length sends) is bounded above by the total number of threads that are doing adds and dels *) fun loop(que, recvs, sends) = if not(Fifo.isEmpty que) andalso not(Fifo.isEmpty recvs) then let val (que, x) = Fifo.dequeue que val (recvs, repCh) = Fifo.dequeue recvs in send(repCh, x); loop(que, recvs, sends) end else if Fifo.length que < max andalso Fifo.length sends > 0 then let val (sends, (x, repCh)) = Fifo.dequeue sends val que = Fifo.enqueue(que, x) in send(repCh, ()); loop(que, recvs, sends) end else case recv reqCh of Del repCh => loop(que, Fifo.enqueue(recvs, repCh), sends) | Add valAndRepCh => loop(que, recvs, Fifo.enqueue(sends, valAndRepCh)) in loop(Fifo.empty, Fifo.empty, Fifo.empty) end exception Size fun make max = if max < 1 then raise Size else let val reqChan = channel() in spawn(fn () => server(max, reqChan)); reqChan end fun add(reqCh, x) = let val repChan = channel() in send(reqCh, Add(x, repChan)); recv repChan end fun del reqCh = let val repChan = channel() in send(reqCh, Del repChan); recv repChan end end;