1 module heaploop.networking.tcp;
2 import heaploop.streams;
3 import heaploop.looping;
4 import heaploop.networking.dns;
5 import duv.c;
6 import duv.types;
7 import events;
8
9 class TcpStream : Stream
10 {
11 private:
12 bool _listening;
13 bool _connecting;
14
15 protected:
16
17 override void initializeHandle() {
18 uv_tcp_init(this.loop.handle, cast(uv_tcp_t*)this.handle).duv_last_error(this.loop.handle).completed();
19 debug std.stdio.writeln("TCP handle initialized");
20 }
21
22 class acceptOperationContext : OperationContext!TcpStream {
23 public:
24 TcpStream client;
25 this(TcpStream target) {
26 super(target);
27 }
28 }
29
30 public:
31
32 this() {
33 this(Loop.current);
34 }
35
36 this(Loop loop) {
37 super(loop, uv_handle_type.TCP);
38 }
39
40 @property uv_tcp_t* handle() {
41 return cast(uv_tcp_t*)super.handle;
42 }
43 alias Stream.handle handle;
44
45 void bind4(string address, int port) {
46 duv_tcp_bind4(cast(uv_tcp_t*)handle, std..string.toStringz(address), port).duv_last_error(this.loop.handle).completed();
47 }
48
49 @property bool isListening() {
50 return _listening;
51 }
52
53 Action!(void, TcpStream) listen(int backlog) {
54 if(_listening) {
55 throw new Exception("Stream already listening");
56 }
57 return new Action!(void, TcpStream)((trigger) {
58 _listening = true;
59 auto cx = new acceptOperationContext(this);
60 duv_listen(cast(uv_stream_t*)this.handle, backlog, cx, function (uv_stream_t* thisHandle, Object contextObj, int status) {
61 auto cx = cast(acceptOperationContext)contextObj;
62 cx.update(status);
63 if(status.isError) {
64 cx.target.close();
65 cx.resume();
66 return;
67 } else {
68 TcpStream client = new TcpStream(cx.target.loop);
69 int acceptStatus = uv_accept(cast(uv_stream_t*)cx.target.handle, cast(uv_stream_t*)client.handle);
70 if(acceptStatus.isError) {
71 cx.target.close();
72 delete client;
73 } else {
74 cx.client = client;
75 }
76 cx.resume();
77 return;
78 }
79 }).duv_last_error(this.loop.handle).completed();
80 while(true) {
81 cx.yield;
82 cx.completed;
83 trigger(cx.client);
84 }
85 });
86 }
87
88 @property bool isConnecting() {
89 return _connecting;
90 }
91
92 void connect(string hostname, int port) {
93 auto addresses = Dns.resolveHost(hostname);
94 NetworkAddress address = null;
95 foreach(addr; addresses) {
96 if(addr.family == AddressFamily.INETv4) {
97 address = addr;
98 break;
99 }
100 }
101 if(address is null) {
102 throw new Exception("Unable to resolve " ~ hostname);
103 }
104 this.connect(address, port);
105 }
106
107 void connect(NetworkAddress address, int port) {
108 if(address.family == AddressFamily.INETv4) {
109 std.stdio.writeln("Connecting to " ~ address.IP);
110 this.connect4(address.IP, port);
111 } else {
112 assert(0, "unable to connect to addresses other than ipv4");
113 }
114 }
115
116 void connect4(string address, int port) {
117 if(_connecting) {
118 throw new Exception("Stream already connecting");
119 }
120 _connecting = true;
121 scope cx = new OperationContext!TcpStream(this);
122 duv_tcp_connect4(this.handle, cx, address, port, function (uv_tcp_t* thisHandle, Object contextObj, int status) {
123 auto cx = cast(OperationContext!TcpStream)contextObj;
124 cx.update(status);
125 debug std.stdio.writeln("connect status", status);
126 if(status.isError) {
127 cx.target.close();
128 }
129 cx.resume();
130 }).duv_last_error(this.loop.handle).completed();
131 cx.yield;
132 cx.completed;
133 }
134
135 }
136