-exports("ring_pipe") /* * A sample reduction operator. * * ring_pipe(input_port, output_port) sums the values in input_port * and writes the sum to each element of output_port. */ ring_pipe(I,O) port s[1], I[], O[]; {|| n = nodes(), vt_lib:vtprintf("Ring_pipe on %d nodes\n",{n},_), ring_pipe1(0,n,s,I,O) } ring_pipe1(i,n,s,I,O) port s[], I[], O[]; { ? i < n -> {|| sum(I[i],O[i],s[(i+1)%n],s[i]) @ vts:node(i), ring_pipe1(i+1,n,s,I,O) } } /* A real naive sum computation! */ sum(i,o,l,r) { ? i ?= [v|i1] -> /* Receive value from input stream */ {|| loc = location(), vt_lib:vtprintf("Received %d on %t\n",{v,loc},_), n = nodes(), r = [v|r1], /* Send it on to the right neighbor */ sum1(n,v,l,r1,l1,r2,v1), /* Send messages around ring */ o = [v1|o1], /* Send result to output stream */ sum(i1,o1,l1,r2) /* Compute next sum */ }, default -> o = [] } /* * Sum over: * n = number of nodes left to sum * v = my value * l = stream from left neighbor * r = stream to right neighbor * l2 = new stream from left * r2 = new stream to right * v1 = new value * */ sum1(n,v,l,r,l2,r2,v1) { ? n > 1, l ?= [m|l1] -> /* Get value from left neighbor */ {|| r = [m+v|r1], /* Pass new partial sum to right neighbor */ sum1(n-1,v,l1,r1,l2,r2,v1) /* and recurse */ }, n == 1, l ?= [m|l1] -> {|| v1 = m, r = r2, l2 = l1 } }