1 module heaploop.networking.tcp; 2 import heaploop.streams; 3 import heaploop.looping; 4 import heaploop.networking.dns; 5 import duv.c; 6 import duv.types; 7 import events; 8 9 class TcpStream : Stream 10 { 11 private: 12 bool _listening; 13 bool _connecting; 14 15 protected: 16 17 override void initializeHandle() { 18 uv_tcp_init(this.loop.handle, cast(uv_tcp_t*)this.handle).duv_last_error(this.loop.handle).completed(); 19 debug std.stdio.writeln("TCP handle initialized"); 20 } 21 22 class acceptOperationContext : OperationContext!TcpStream { 23 public: 24 TcpStream client; 25 this(TcpStream target) { 26 super(target); 27 } 28 } 29 30 public: 31 32 this() { 33 this(Loop.current); 34 } 35 36 this(Loop loop) { 37 super(loop, uv_handle_type.TCP); 38 } 39 40 @property uv_tcp_t* handle() { 41 return cast(uv_tcp_t*)super.handle; 42 } 43 alias Stream.handle handle; 44 45 void bind4(string address, int port) { 46 duv_tcp_bind4(cast(uv_tcp_t*)handle, std..string.toStringz(address), port).duv_last_error(this.loop.handle).completed(); 47 } 48 49 @property bool isListening() { 50 return _listening; 51 } 52 53 Action!(void, TcpStream) listen(int backlog) { 54 if(_listening) { 55 throw new Exception("Stream already listening"); 56 } 57 return new Action!(void, TcpStream)((trigger) { 58 _listening = true; 59 auto cx = new acceptOperationContext(this); 60 duv_listen(cast(uv_stream_t*)this.handle, backlog, cx, function (uv_stream_t* thisHandle, Object contextObj, int status) { 61 auto cx = cast(acceptOperationContext)contextObj; 62 cx.update(status); 63 if(status.isError) { 64 cx.target.close(); 65 cx.resume(); 66 return; 67 } else { 68 TcpStream client = new TcpStream(cx.target.loop); 69 int acceptStatus = uv_accept(cast(uv_stream_t*)cx.target.handle, cast(uv_stream_t*)client.handle); 70 if(acceptStatus.isError) { 71 cx.target.close(); 72 delete client; 73 } else { 74 cx.client = client; 75 } 76 cx.resume(); 77 return; 78 } 79 }).duv_last_error(this.loop.handle).completed(); 80 while(true) { 81 cx.yield; 82 cx.completed; 83 trigger(cx.client); 84 } 85 }); 86 } 87 88 @property bool isConnecting() { 89 return _connecting; 90 } 91 92 void connect(string hostname, int port) { 93 auto addresses = Dns.resolveHost(hostname); 94 NetworkAddress address = null; 95 foreach(addr; addresses) { 96 if(addr.family == AddressFamily.INETv4) { 97 address = addr; 98 break; 99 } 100 } 101 if(address is null) { 102 throw new Exception("Unable to resolve " ~ hostname); 103 } 104 this.connect(address, port); 105 } 106 107 void connect(NetworkAddress address, int port) { 108 if(address.family == AddressFamily.INETv4) { 109 std.stdio.writeln("Connecting to " ~ address.IP); 110 this.connect4(address.IP, port); 111 } else { 112 assert(0, "unable to connect to addresses other than ipv4"); 113 } 114 } 115 116 void connect4(string address, int port) { 117 if(_connecting) { 118 throw new Exception("Stream already connecting"); 119 } 120 _connecting = true; 121 scope cx = new OperationContext!TcpStream(this); 122 duv_tcp_connect4(this.handle, cx, address, port, function (uv_tcp_t* thisHandle, Object contextObj, int status) { 123 auto cx = cast(OperationContext!TcpStream)contextObj; 124 cx.update(status); 125 debug std.stdio.writeln("connect status", status); 126 if(status.isError) { 127 cx.target.close(); 128 } 129 cx.resume(); 130 }).duv_last_error(this.loop.handle).completed(); 131 cx.yield; 132 cx.completed; 133 } 134 135 } 136