1 module heaploop.networking.tcp;
2 import heaploop.streams;
3 import heaploop.looping;
4 import heaploop.networking.dns;
5 import duv.c;
6 import duv.types;
7 import events;
8 
9 class TcpStream : Stream
10 {
11     private:
12         bool _listening;
13         bool _connecting;
14 
15     protected:
16 
17         override void initializeHandle() {
18             uv_tcp_init(this.loop.handle, cast(uv_tcp_t*)this.handle).duv_last_error(this.loop.handle).completed();
19             debug std.stdio.writeln("TCP handle initialized");
20         }
21 
22         class acceptOperationContext : OperationContext!TcpStream {
23             public:
24                 TcpStream client;
25                 this(TcpStream target) {
26                     super(target);
27                 }
28         }
29 
30     public:
31 
32         this() {
33             this(Loop.current);
34         }
35 
36         this(Loop loop) {
37             super(loop, uv_handle_type.TCP);
38         }
39 
40         @property uv_tcp_t* handle() {
41             return cast(uv_tcp_t*)super.handle;
42         }
43         alias Stream.handle handle;
44 
45         void bind4(string address, int port) {
46             duv_tcp_bind4(cast(uv_tcp_t*)handle, std..string.toStringz(address), port).duv_last_error(this.loop.handle).completed();
47         }
48 
49         @property bool isListening() {
50             return _listening;
51         }
52 
53         Action!(void, TcpStream) listen(int backlog) {
54             if(_listening) {
55                 throw new Exception("Stream already listening");
56             }
57             return new Action!(void, TcpStream)((trigger) {
58                 _listening = true;
59                 auto cx = new acceptOperationContext(this);
60                 duv_listen(cast(uv_stream_t*)this.handle, backlog, cx, function (uv_stream_t* thisHandle, Object contextObj, int status) {
61                     auto cx = cast(acceptOperationContext)contextObj;
62                     cx.update(status);
63                     if(status.isError) {
64                         cx.target.close();
65                         cx.resume();
66                         return;
67                     } else {
68                         TcpStream client = new TcpStream(cx.target.loop);
69                         int acceptStatus = uv_accept(cast(uv_stream_t*)cx.target.handle, cast(uv_stream_t*)client.handle);
70                         if(acceptStatus.isError) {
71                             cx.target.close();
72                             delete client;
73                         } else {
74                             cx.client = client;
75                         }
76                         cx.resume();
77                         return;
78                     }
79                 }).duv_last_error(this.loop.handle).completed();
80                 while(true) {
81                     cx.yield;
82                     cx.completed;
83                     trigger(cx.client);
84                 }
85             });
86         }
87 
88         @property bool isConnecting() {
89             return _connecting;
90         }
91 
92         void connect(string hostname, int port) {
93             auto addresses = Dns.resolveHost(hostname);
94             NetworkAddress address = null;
95             foreach(addr; addresses) {
96                 if(addr.family == AddressFamily.INETv4) {
97                     address = addr;
98                     break;
99                 }
100             }
101             if(address is null) {
102                throw new Exception("Unable to resolve " ~ hostname);
103             }
104             this.connect(address, port);
105         }
106 
107         void connect(NetworkAddress address, int port) {
108             if(address.family == AddressFamily.INETv4) {
109                 std.stdio.writeln("Connecting to " ~ address.IP);
110                 this.connect4(address.IP, port);
111             } else {
112                 assert(0, "unable to connect to addresses other than ipv4");
113             }
114         }
115 
116         void connect4(string address, int port) {
117             if(_connecting) {
118                 throw new Exception("Stream already connecting");
119             }
120             _connecting = true;
121             scope cx = new OperationContext!TcpStream(this);
122             duv_tcp_connect4(this.handle, cx, address, port, function (uv_tcp_t* thisHandle, Object contextObj, int status) {
123                 auto cx = cast(OperationContext!TcpStream)contextObj;
124                 cx.update(status);
125                 debug std.stdio.writeln("connect status", status);
126                 if(status.isError) {
127                     cx.target.close();
128                 }
129                 cx.resume();
130             }).duv_last_error(this.loop.handle).completed();
131             cx.yield;
132             cx.completed;
133         }
134 
135 }
136