1 module heaploop.streams; 2 import heaploop.looping; 3 import events; 4 import duv.c; 5 import duv.types; 6 import core.thread; 7 debug { 8 import std.stdio; 9 } 10 11 abstract class Stream : Handle { 12 private: 13 bool _isReading; 14 readOperationContext _readOperation; 15 16 class readOperationContext : OperationContext!Stream { 17 public: 18 ubyte[] readData; 19 bool stopped; 20 this(Stream target) { 21 super(target); 22 } 23 } 24 25 public: 26 27 this(Loop loop, uv_handle_type type) { 28 super(loop, type); 29 } 30 31 @property uv_stream_t* handle() { 32 return cast(uv_stream_t*)super.handle; 33 } 34 alias Handle.handle handle; 35 36 void write(ubyte[] data) { 37 ensureOpen; 38 auto wc = new OperationContext!Stream(this); 39 duv_write(this.handle, wc, data, function (uv_stream_t * thisHandle, contextObj, status writeStatus) { 40 auto wc = cast(OperationContext!Stream)contextObj; 41 wc.update(writeStatus); 42 if(writeStatus.isError) { 43 // we must close inmediately we receive the error and before continue in the fiber 44 wc.target.close(); 45 } 46 wc.resume(); 47 }); 48 scope (exit) delete wc; 49 wc.yield; 50 wc.completed; 51 debug std.stdio.writeln("Write completed"); 52 } 53 54 @property bool isReading() pure nothrow { 55 return _isReading; 56 } 57 58 Action!(void, ubyte[]) read() { 59 ensureOpen; 60 return new Action!(void, ubyte[])((trigger) { 61 _isReading = true; 62 auto rx = _readOperation = new readOperationContext(this); 63 duv_read_start(this.handle, rx, (uv_stream_t * client_conn, Object readContext, ptrdiff_t nread, ubyte[] data) { 64 int status = cast(int)nread; 65 auto rx = cast(readOperationContext)readContext; 66 rx.update(status); 67 Stream thisStream = rx.target; 68 rx.readData = data; 69 if(status.isError) { 70 // we must close inmediately we receive the error and before continue in the fiber 71 rx.target.close(); 72 } 73 rx.resume(); 74 }); 75 scope (exit) stopReading(); 76 while(!rx.stopped) { 77 debug std.stdio.writeln("read (activated block) will yield"); 78 rx.yield; 79 debug std.stdio.writeln("read (activated block) continue after yield"); 80 if(!rx.stopped) { 81 try { 82 rx.completed; 83 } catch(LoopException lex) { 84 if(lex.name == "EOF") { 85 debug std.stdio.writeln("EOF detected, forcing close"); 86 close(); 87 break; 88 } else { 89 debug std.stdio.writeln("read detected, forcing close"); 90 close(); 91 throw lex; 92 } 93 } 94 trigger(rx.readData); 95 } else { 96 debug std.stdio.writeln("read was stopped, breaking read loop"); 97 break; 98 } 99 } 100 _readOperation = null; 101 }); 102 } 103 104 ubyte[] readOnce() { 105 ensureOpen; 106 if(_isReading) { 107 throw new Exception("Stream is already reading"); 108 } 109 _isReading = true; 110 auto rx = _readOperation = new readOperationContext(this); 111 duv_read_start(this.handle, rx, (uv_stream_t * client_conn, Object readContext, ptrdiff_t nread, ubyte[] data) { 112 int status = cast(int)nread; 113 auto rx = cast(readOperationContext)readContext; 114 rx.update(status); 115 Stream thisStream = rx.target; 116 rx.readData = data; 117 if(status.isError) { 118 // we must close inmediately we receive the error and before continue in the fiber 119 rx.target.close(); 120 } 121 rx.resume(); 122 }); 123 scope (exit) stopReading(); 124 debug std.stdio.writeln("read (activated block) will yield"); 125 rx.yield; 126 debug std.stdio.writeln("read (activated block) continue after yield"); 127 try { 128 rx.completed; 129 } catch(LoopException lex) { 130 if(lex.name == "EOF") { 131 debug std.stdio.writeln("EOF detected, forcing close"); 132 close(); 133 } else { 134 debug std.stdio.writeln("error detected, forcing close"); 135 close(); 136 throw lex; 137 } 138 } 139 return rx.readData; 140 } 141 142 void stopReading() { 143 // [WARNING] might be executed from fiber or thread 144 if(_isReading) { 145 debug std.stdio.writeln("stopReading"); 146 duv_read_stop(this.handle); 147 _isReading = false; 148 if(_readOperation) { 149 _readOperation.stopped = true; 150 _readOperation.resume; 151 _readOperation = null; 152 } 153 } 154 } 155 156 protected: 157 override void closeCleanup(bool async) { 158 stopReading(); 159 } 160 }