(* multicast.sml *) (* includes port duplication *) structure Multicast :> MULTICAST = struct open CML structure SV = SyncVar (* streams made out of I-vars *) datatype 'a stream = Cons of 'a * 'a stream SV.ivar type 'a port = {portCh : 'a chan, (* receive messages *) copyPortCh : 'a stream SV.ivar chan} (* copy port *) datatype 'a request = Msg of 'a (* incoming message *) | NewPort of 'a port SV.ivar (* create a new port *) type 'a mchan = 'a request chan (* val mkPort : 'a stream SV.ivar -> 'a port *) fun mkPort iv = let val portCh = channel() val copyPortCh = channel() (* val empty : 'a SV.ivar -> unit *) fun empty iv = select [wrap(SV.iGetEvt iv, fn Cons(v, iv') => full(iv, v, iv')), wrap(sendEvt(copyPortCh, iv), fn () => empty iv)] (* val full : 'a SV.ivar * 'a * 'a SV.ivar -> unit *) and full(iv, v, iv') = select [wrap(sendEvt(portCh, v), fn () => empty iv'), wrap(sendEvt(copyPortCh, iv), fn () => full(iv, v, iv'))] in spawn(fn () => empty iv); {portCh = portCh, copyPortCh = copyPortCh} end fun mChannel() = let val reqCh = channel() (* val server : 'a stream SV.ivar -> unit *) fun server iv = case recv reqCh of Msg v => let val iv' = SV.iVar() in SV.iPut(iv, Cons(v, iv')); server iv' end | NewPort replyIV => (SV.iPut(replyIV, mkPort iv); server iv) in spawn(fn () => server(SV.iVar())); reqCh end fun multicast(reqCh, v) = send(reqCh, Msg v) fun port reqCh = let val replyIV = SV.iVar() in send(reqCh, NewPort replyIV); SV.iGet replyIV end fun recvEvt({portCh, ...} : 'a port) = CML.recvEvt portCh fun copy({copyPortCh, ...} : 'a port) = mkPort(recv copyPortCh) end;