1 module heaploop.streams;
2 import heaploop.looping;
3 import events;
4 import duv.c;
5 import duv.types;
6 import core.thread;
7 debug {
8     import std.stdio;
9 }
10 
11 abstract class Stream : Handle {
12     private:
13         bool _isReading;
14         readOperationContext _readOperation;
15         
16         class readOperationContext : OperationContext!Stream {
17             public:
18                 ubyte[] readData;
19                 bool stopped;
20                 this(Stream target) {
21                     super(target);
22                 }
23         }
24 
25     public:
26 
27         this(Loop loop, uv_handle_type type) {
28             super(loop, type);
29         }
30         
31         @property uv_stream_t* handle() {
32             return cast(uv_stream_t*)super.handle;
33         }
34         alias Handle.handle handle;
35 
36         void write(ubyte[] data) {
37             ensureOpen;
38             auto wc = new OperationContext!Stream(this);
39             duv_write(this.handle, wc, data, function (uv_stream_t * thisHandle, contextObj, status writeStatus) {
40                     auto wc = cast(OperationContext!Stream)contextObj;
41                     wc.update(writeStatus);
42                     if(writeStatus.isError) {
43                         // we must close inmediately we receive the error and before continue in the fiber
44                         wc.target.close();
45                     }
46                     wc.resume();
47             });
48             scope (exit) delete wc;
49             wc.yield;
50             wc.completed;
51             debug std.stdio.writeln("Write completed");
52         }
53 
54         @property bool isReading() pure nothrow {
55             return _isReading;
56         }
57 
58         Action!(void, ubyte[]) read() {
59             ensureOpen;
60             return new Action!(void, ubyte[])((trigger) {
61                 _isReading = true;
62                 auto rx = _readOperation = new readOperationContext(this);
63                 duv_read_start(this.handle, rx, (uv_stream_t * client_conn, Object readContext, ptrdiff_t nread, ubyte[] data) {
64                         int status = cast(int)nread;
65                         auto rx = cast(readOperationContext)readContext;
66                         rx.update(status);
67                         Stream thisStream = rx.target;
68                         rx.readData = data;
69                         if(status.isError) {
70                             // we must close inmediately we receive the error and before continue in the fiber
71                             rx.target.close();
72                         }
73                         rx.resume();
74                 });
75                 scope (exit) stopReading();
76                 while(!rx.stopped) {
77                     debug std.stdio.writeln("read (activated block) will yield");
78                     rx.yield;
79                     debug std.stdio.writeln("read (activated block) continue after yield");
80                     if(!rx.stopped) {
81                         try {
82                             rx.completed;
83                         } catch(LoopException lex) {
84                             if(lex.name == "EOF") {
85                                 debug std.stdio.writeln("EOF detected, forcing close");
86                                 close();
87                                 break;
88                             } else {
89                                 debug std.stdio.writeln("read detected, forcing close");
90                                 close();
91                                 throw lex;
92                             }
93                         }
94                         trigger(rx.readData);
95                     } else {
96                         debug std.stdio.writeln("read was stopped, breaking read loop");
97                         break;
98                     }
99                 }
100                 _readOperation = null;
101             });
102         }
103 
104         ubyte[] readOnce() {
105             ensureOpen;
106             if(_isReading) {
107                 throw new Exception("Stream is already reading");
108             }
109             _isReading = true;
110             auto rx = _readOperation = new readOperationContext(this);
111             duv_read_start(this.handle, rx, (uv_stream_t * client_conn, Object readContext, ptrdiff_t nread, ubyte[] data) {
112                     int status = cast(int)nread;
113                     auto rx = cast(readOperationContext)readContext;
114                     rx.update(status);
115                     Stream thisStream = rx.target;
116                     rx.readData = data;
117                     if(status.isError) {
118                         // we must close inmediately we receive the error and before continue in the fiber
119                         rx.target.close();
120                     }
121                     rx.resume();
122             });
123             scope (exit) stopReading();
124             debug std.stdio.writeln("read (activated block) will yield");
125             rx.yield;
126             debug std.stdio.writeln("read (activated block) continue after yield");
127             try {
128                 rx.completed;
129             } catch(LoopException lex) {
130                 if(lex.name == "EOF") {
131                     debug std.stdio.writeln("EOF detected, forcing close");
132                     close();
133                 } else {
134                     debug std.stdio.writeln("error detected, forcing close");
135                     close();
136                     throw lex;
137                 }
138             }
139             return rx.readData;
140         }
141 
142         void stopReading() {
143             // [WARNING] might be executed from fiber or thread
144             if(_isReading) {
145                 debug std.stdio.writeln("stopReading");
146                 duv_read_stop(this.handle);
147                 _isReading = false;
148                 if(_readOperation) {
149                     _readOperation.stopped = true;
150                     _readOperation.resume;
151                     _readOperation = null;
152                 }
153             }
154         }
155 
156     protected:
157         override void closeCleanup(bool async) {
158             stopReading();
159         }
160 }