Fix multi_threaded_io.go

master
Jonas Franz 4 years ago committed by Jonas Franz
parent 6d0fad42c1
commit 5e2c060a2f
  1. 17
      hal/cluster.go
  2. 22
      hal/multi_threaded_io.go

@ -17,25 +17,12 @@ func (c *Cluster) Run(ctx context.Context) error {
return errs.Wait()
}
type ModuleMapping map[MultiThreadedIOPort]MultiThreadedIOPort
func (config *ClusterConfig) BuildCluster(modules map[string]*Module) (*Cluster, error) {
// We assume all modules in the connection config exist
for _, c := range config.Connections {
// For a connection like a -> b we need two channels:
// One which is the IN channel on a and the OUT channel on b
// One which is the OUT channel on a and the IN channel on b
// That means we need to create two IOs, one for each node and two channels
// The two channels then need to be passed around
// hmmmmmmm...
outConn := &MultiThreadedIOPort{
Module: modules[c.DestNode],
IOPort: c.DestPort,
}
conn := NewMultiThreadedIO(outConn)
conn := NewMultiThreadedIO()
modules[c.SourceNode].IO[c.SourcePort] = conn
modules[c.DestNode].IO[c.DestPort] = conn
}
// for host, client := range mapping {

@ -1,31 +1,17 @@
package hal
type MultiThreadedIOPort struct {
Module *Module
IOPort int64
}
func (port *MultiThreadedIOPort) OutgoingIO() *MultiThreadedIO {
if outgoingIO, ok := port.Module.IO[port.IOPort].(*MultiThreadedIO); ok {
return outgoingIO
}
return nil
}
type MultiThreadedIO struct {
channel chan float64
OutgoingIOPort *MultiThreadedIOPort
channel chan float64
}
func NewMultiThreadedIO(outgoingPort *MultiThreadedIOPort) *MultiThreadedIO {
func NewMultiThreadedIO() *MultiThreadedIO {
return &MultiThreadedIO{
channel: make(chan float64),
OutgoingIOPort: outgoingPort,
channel: make(chan float64),
}
}
func (io *MultiThreadedIO) Read() (float64, error) {
result := <-io.OutgoingIOPort.OutgoingIO().channel
result := <-io.channel
return result, nil
}

Loading…
Cancel
Save