The OpenD Programming Language

1 module requests.streams;
2 
3 private:
4 import std.algorithm;
5 import std.array;
6 import std.conv;
7 import std.experimental.logger;
8 import std.exception;
9 import std.format;
10 import std.range;
11 import std.range.primitives;
12 import std.string;
13 import std.stdio;
14 import std.traits;
15 import std.zlib;
16 import std.datetime;
17 import std.socket;
18 import core.stdc.errno;
19 import core.stdc.string;
20 
21 import requests.ssl_adapter : openssl, SSL, SSL_CTX;
22 
23 alias InDataHandler = DataPipeIface!ubyte;
24 
25 public class ConnectError: Exception {
26     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
27         super(message, file, line, next);
28     }
29 }
30 
31 class DecodingException: Exception {
32     this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
33         super(message, file, line, next);
34     }
35 }
36 
37 public class TimeoutException: Exception {
38     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
39         super(message, file, line, next);
40     }
41 }
42 
43 public class NetworkException: Exception {
44     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
45         super(message, file, line, next);
46     }
47 }
48 
49 /**
50  * DataPipeIface can accept some data, process, and return processed data.
51  */
52 public interface DataPipeIface(E) {
53     /// Is there any processed data ready for reading?
54     bool empty();
55     /// Put next data portion for processing
56     //void put(E[]);
57     void putNoCopy(E[]);
58     /// Get any ready data
59     E[] get();
60     /// Signal on end of incoming data stream.
61     void flush();
62 }
63 /**
64  * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line.
65  * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked",
66  * and uncompress Content-Encoding "gzip".
67  */
68 public class DataPipe(E) : DataPipeIface!E {
69 
70     DataPipeIface!(E)[]  pipe;
71     Buffer!E             buffer;
72     /// Append data processor to pipeline
73     /// Params:
74     /// p = processor
75     final void insert(DataPipeIface!E p) {
76         pipe ~= p;
77     }
78     final E[][] process(DataPipeIface!E p, E[][] data) {
79         E[][] result;
80         data.each!(e => p.putNoCopy(e));
81         while(!p.empty()) result ~= p.get();
82         return result;
83     }
84     /// Process next data portion. Data passed over pipeline and store result in buffer.
85     /// Params:
86     /// data = input data buffer.
87     /// NoCopy means we do not copy data to buffer, we keep reference
88     final void putNoCopy(E[] data) {
89         if ( pipe.empty ) {
90             buffer.putNoCopy(data);
91             return;
92         }
93         try {
94             auto t = process(pipe.front, [data]);
95             foreach(ref p; pipe[1..$]) {
96                 t = process(p, t);
97             }
98             t.each!(b => buffer.putNoCopy(b));
99         }
100         catch (Exception e) {
101             throw new DecodingException(e.msg);
102         }
103     }
104     /// Get what was collected in internal buffer and clear it.
105     /// Returns:
106     /// data collected.
107     final E[] get() {
108         if ( buffer.empty ) {
109             return E[].init;
110         }
111         auto res = buffer.data;
112         buffer = Buffer!E.init;
113         return res;
114     }
115     ///
116     /// get without datamove. but user receive [][]
117     ///
118     final E[][] getNoCopy()  {
119         if ( buffer.empty ) {
120             return E[][].init;
121         }
122         E[][] res = buffer.__repr.__buffer;
123         buffer = Buffer!E.init;
124         return res;
125     }
126     /// Test if internal buffer is empty
127     /// Returns:
128     /// true if internal buffer is empty (nothing to get())
129     final bool empty() pure const @safe {
130         return buffer.empty;
131     }
132     final void flush() {
133         E[][] product;
134         foreach(ref p; pipe) {
135             product.each!(e => p.putNoCopy(e));
136             p.flush();
137             product.length = 0;
138             while( !p.empty ) product ~= p.get();
139         }
140         product.each!(b => buffer.putNoCopy(b));
141     }
142 }
143 
144 /**
145  * Processor for gzipped/compressed content.
146  * Also support InputRange interface.
147  */
148 public class Decompressor(E) : DataPipeIface!E {
149     private {
150         Buffer!ubyte __buff;
151         UnCompress   __zlib;
152     }
153     this() {
154         __buff = Buffer!ubyte();
155         __zlib = new UnCompress();
156     }
157     final override void putNoCopy(E[] data) {
158         if ( __zlib is null  ) {
159             __zlib = new UnCompress();
160         }
161         __buff.putNoCopy(__zlib.uncompress(data));
162     }
163     final override E[] get() pure {
164         assert(__buff.length);
165         auto r = __buff.__repr.__buffer[0];
166         __buff.popFrontN(r.length);
167         return cast(E[])r;
168     }
169     final override void flush() {
170         if ( __zlib is null  ) {
171             return;
172         }
173         __buff.put(__zlib.flush());
174     }
175     final override @property bool empty() const pure @safe {
176         debug(requests) tracef("empty=%b", __buff.empty);
177         return __buff.empty;
178     }
179     final @property auto ref front() pure const @safe {
180         debug(requests) tracef("front: buff length=%d", __buff.length);
181         return __buff.front;
182     }
183     final @property auto popFront() pure @safe {
184         debug(requests) tracef("popFront: buff length=%d", __buff.length);
185         return __buff.popFront;
186     }
187     final @property void popFrontN(size_t n) pure @safe {
188         __buff.popFrontN(n);
189     }
190 }
191 
192 /**
193  * Unchunk chunked http responce body.
194  */
195 public class DecodeChunked : DataPipeIface!ubyte {
196     //    length := 0
197     //    read chunk-size, chunk-extension (if any) and CRLF
198     //    while (chunk-size > 0) {
199     //        read chunk-data and CRLF
200     //        append chunk-data to entity-body
201     //        length := length + chunk-size
202     //        read chunk-size and CRLF
203     //    }
204     //    read entity-header
205     //    while (entity-header not empty) {
206     //        append entity-header to existing header fields
207     //        read entity-header
208     //    }
209     //    Content-Length := length
210     //    Remove "chunked" from Transfer-Encoding
211     //
212 
213     //    Chunked-Body   = *chunk
214     //                      last-chunk
215     //                      trailer
216     //                      CRLF
217     //
218     //    chunk          = chunk-size [ chunk-extension ] CRLF
219     //                     chunk-data CRLF
220     //                     chunk-size     = 1*HEX
221     //                     last-chunk     = 1*("0") [ chunk-extension ] CRLF
222     //
223     //    chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
224     //    chunk-ext-name = token
225     //    chunk-ext-val  = token | quoted-string
226     //    chunk-data     = chunk-size(OCTET)
227     //    trailer        = *(entity-header CRLF)
228 
229     alias eType = ubyte;
230     immutable eType[] CRLF = ['\r', '\n'];
231     private {
232         enum         States {huntingSize, huntingSeparator, receiving, trailer};
233         char         state = States.huntingSize;
234         size_t       chunk_size, to_receive;
235         Buffer!ubyte buff;
236         ubyte[]      linebuff;
237     }
238     final void putNoCopy(eType[] data) {
239         while ( data.length ) {
240             if ( state == States.trailer ) {
241                 to_receive = to_receive - min(to_receive, data.length);
242                 return;
243             }
244             if ( state == States.huntingSize ) {
245                 import std.ascii;
246                 ubyte[10] digits;
247                 int i;
248                 for(i=0;i<data.length;i++) {
249                     ubyte v = data[i];
250                     digits[i] = v;
251                     if ( v == '\n' ) {
252                         i+=1;
253                         break;
254                     }
255                 }
256                 linebuff ~= digits[0..i];
257                 if ( linebuff.length >= 80 ) {
258                     throw new DecodingException("Can't find chunk size in the body");
259                 }
260                 data = data[i..$];
261                 if (!linebuff.canFind(CRLF)) {
262                     continue;
263                 }
264                 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b";
265                 state = States.receiving;
266                 to_receive = chunk_size;
267                 if ( chunk_size == 0 ) {
268                     to_receive = 2-min(2, data.length); // trailing \r\n
269                     state = States.trailer;
270                     return;
271                 }
272                 continue;
273             }
274             if ( state == States.receiving ) {
275                 if (to_receive > 0 ) {
276                     auto can_store = min(to_receive, data.length);
277                     buff.putNoCopy(data[0..can_store]);
278                     data = data[can_store..$];
279                     to_receive -= can_store;
280                     //tracef("Unchunked %d bytes from %d", can_store, chunk_size);
281                     if ( to_receive == 0 ) {
282                         //tracef("switch to huntig separator");
283                         state = States.huntingSeparator;
284                         continue;
285                     }
286                     continue;
287                 }
288                 assert(false);
289             }
290             if ( state == States.huntingSeparator ) {
291                 if ( data[0] == '\n' || data[0]=='\r') {
292                     data = data[1..$];
293                     continue;
294                 }
295                 state = States.huntingSize;
296                 linebuff.length = 0;
297                 continue;
298             }
299         }
300     }
301     final eType[] get() {
302         auto r = buff.__repr.__buffer[0];
303         buff.popFrontN(r.length);
304         return r;
305     }
306     final void flush() {
307     }
308     final bool empty() {
309         debug(requests) tracef("empty=%b", buff.empty);
310         return buff.empty;
311     }
312     final bool done() {
313         return state==States.trailer && to_receive==0;
314     }
315 }
316 
317 unittest {
318     info("Testing DataPipe");
319     globalLogLevel(LogLevel.info);
320     alias eType = char;
321     eType[] gzipped = [
322         0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56,
323         0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49,
324         0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88,
325         0x08, 0x00, 0x00, 0x00
326     ]; // "abc\ndef\n"
327     auto d = new Decompressor!eType();
328     d.putNoCopy(gzipped[0..2].dup);
329     d.putNoCopy(gzipped[2..10].dup);
330     d.putNoCopy(gzipped[10..$].dup);
331     d.flush();
332     assert(equal(d.filter!(a => a!='b'), "ac\ndef\n"));
333 
334     auto e = new Decompressor!eType();
335     e.putNoCopy(gzipped[0..10].dup);
336     e.putNoCopy(gzipped[10..$].dup);
337     e.flush();
338     assert(equal(e.filter!(a => a!='b'), "ac\ndef\n"));
339     //    writeln(gzipped.decompress.filter!(a => a!='b').array);
340     auto dp = new DataPipe!eType;
341     dp.insert(new Decompressor!eType());
342     dp.putNoCopy(gzipped[0..2].dup);
343     dp.putNoCopy(gzipped[2..$].dup);
344     dp.flush();
345     assert(equal(dp.get(), "abc\ndef\n"));
346     // empty datapipe shoul just pass input to output
347     auto dpu = new DataPipe!ubyte;
348     dpu.putNoCopy("abcd".dup.representation);
349     dpu.putNoCopy("efgh".dup.representation);
350     dpu.flush();
351     assert(equal(dpu.get(), "abcdefgh"));
352     info("Test unchunker properties");
353     ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation;
354     ubyte[][] result;
355     auto uc = new DecodeChunked();
356     uc.putNoCopy(twoChunks);
357     while(!uc.empty) {
358         result ~= uc.get();
359     }
360     assert(equal(result[0], ['1', '2']));
361     assert(equal(result[1], ['3', '4']));
362     info("unchunker correctness - ok");
363     result[0][0] = '5';
364     assert(twoChunks[3] == '5');
365     info("unchunker zero copy - ok");
366     info("Testing DataPipe - done");
367 }
368 
369 public class LineSplitter : DataPipeIface!ubyte
370 {
371     private
372     {
373         Buffer!ubyte    buff;
374         int            NL = -1;
375         int            scanned;
376     }
377     bool empty()
378     {
379         return NL == -1;
380     }
381     void putNoCopy(ubyte[] s)
382     {
383         debug(requests) tracef("put data: <<%s>>", cast(string)s);
384         buff.putNoCopy(s);
385         if (NL == -1)
386         {
387             auto new_NL = buff.indexOf(cast(ubyte)'\n', scanned);
388             NL = new_NL;
389             debug(requests) tracef("new nl=%d", NL);
390         }
391         scanned = cast(int)buff.length;
392     }
393     ubyte[] get()
394     {
395         if (NL>=0)
396         {
397             auto res = buff[0..NL+1].data;
398             buff.popFrontN(NL+1);
399             NL = buff.indexOf(cast(ubyte)'\n', 0);
400             if (NL == -1)
401             {
402                 scanned = cast(int)buff.length;
403             }
404             return res;
405         }
406         return new ubyte[](0);
407     }
408     void flush()
409     {
410         NL = cast(int)buff.length - 1;
411     }
412 }
413 unittest
414 {
415     info("testing LineSplitter");
416     LineSplitter ls = new LineSplitter();
417     ls.putNoCopy("abc".representation.dup);
418     assert(ls.empty);
419     ls.putNoCopy("\n".representation.dup);
420     assert(!ls.empty);
421     assert(equal(ls.get, "abc\n"));
422     assert(ls.empty);
423     assert(equal(ls.get,""));
424     ls.putNoCopy("def\n".representation.dup);
425     ls.putNoCopy("ghi\njk".representation.dup);
426     assert(!ls.empty);
427     assert(equal(ls.get, "def\n"));
428     assert(!ls.empty);
429     assert(equal(ls.get, "ghi\n"));
430     assert(ls.empty);
431     ls.flush();
432     assert(equal(ls.get, "jk"));
433 }
434 /**
435  * Buffer used to collect and process data from network. It remainds Appender, but support
436  * also Range interface.
437  * $(P To place data in buffer use put() method.)
438  * $(P  To retrieve data from buffer you can use several methods:)
439  * $(UL
440  *  $(LI Range methods: front, back, index [])
441  *  $(LI data method: return collected data (like Appender.data))
442  * )
443  */
444 enum   CACHESIZE = 1024;
445 
446 static long reprAlloc;
447 static long reprCacheHit;
448 static long reprCacheRequests;
449 
450 
451 public struct Buffer(T) {
452 //    static Repr[CACHESIZE]  cache;
453 //    static uint             cacheIndex;
454 
455     private {
456         Repr  cachedOrNew() {
457             return new Repr;
458 //            reprCacheRequests++;
459 //            if ( false && cacheIndex>0 ) {
460 //                reprCacheHit++;
461 //                cacheIndex -= 1;
462 //                return cache[cacheIndex];
463 //            } else {
464 //                return new Repr;
465 //            }
466         }
467         class Repr {
468             size_t         __length;
469             Unqual!T[][]   __buffer;
470             this() {
471                 reprAlloc++;
472                 __length = 0;
473             }
474             this(Repr other) {
475                 reprAlloc++;
476                 if ( other is null )
477                     return;
478                 __length = other.__length;
479                 __buffer = other.__buffer.dup;
480             }
481         }
482         Repr __repr;
483     }
484 
485     alias toString = data!string;
486 
487     this(this) {
488         if ( !__repr ) {
489             return;
490         }
491         __repr = new Repr(__repr);
492     }
493     this(U)(U[] data) {
494         put(data);
495     }
496     ~this() {
497         __repr = null;
498     }
499     /***************
500      * store data. Data copied
501      */
502     auto put(U)(U[] data) {
503         if ( data.length == 0 ) {
504             return;
505         }
506         if ( !__repr ) {
507             __repr = cachedOrNew();
508         }
509         static if (!is(U == T)) {
510             auto d = cast(T[])(data);
511             __repr.__length += d.length;
512             __repr.__buffer ~= d.dup;
513         } else {
514             __repr.__length += data.length;
515             __repr.__buffer ~= data.dup;
516         }
517         return;
518     }
519     auto putNoCopy(U)(U[] data) {
520         if ( data.length == 0 ) {
521             return;
522         }
523         if ( !__repr ) {
524             __repr = cachedOrNew();
525         }
526         static if (!is(U == T)) {
527             auto d = cast(T[])(data);
528             __repr.__length += d.length;
529             __repr.__buffer ~= d;
530         } else {
531             __repr.__length += data.length;
532             __repr.__buffer ~= data;
533         }
534         return;
535     }
536     @property auto opDollar() const pure @safe {
537         return __repr.__length;
538     }
539     @property size_t length() const pure @safe {
540         if ( !__repr ) {
541             return 0;
542         }
543         return __repr.__length;
544     }
545     @property auto empty() const pure @safe {
546         return length == 0;
547     }
548     @property auto ref front() const pure @safe {
549         assert(length);
550         return __repr.__buffer.front.front;
551     }
552     @property auto ref back() const pure @safe {
553         assert(length);
554         return __repr.__buffer.back.back;
555     }
556     @property void popFront() pure @safe {
557         assert(length);
558         with ( __repr ) {
559             __buffer.front.popFront;
560             if ( __buffer.front.length == 0 ) {
561                 __buffer.popFront;
562             }
563             __length--;
564         }
565     }
566     @property void popFrontN(size_t n) pure @safe {
567         assert(n <= length, "lengnt: %d, n=%d".format(length, n));
568         __repr.__length -= n;
569         while( n ) {
570             if ( n <= __repr.__buffer.front.length ) {
571                 __repr.__buffer.front.popFrontN(n);
572                 if ( __repr.__buffer.front.length == 0 ) {
573                     __repr.__buffer.popFront;
574                 }
575                 return;
576             }
577             n -= __repr.__buffer.front.length;
578             __repr.__buffer.popFront;
579         }
580     }
581     @property void popBack() pure @safe {
582         assert(length);
583         __repr.__buffer.back.popBack;
584         if ( __repr.__buffer.back.length == 0 ) {
585             __repr.__buffer.popBack;
586         }
587         __repr.__length--;
588     }
589     @property void popBackN(size_t n) pure @safe {
590         assert(n <= length, "n: %d, length: %d".format(n, length));
591         __repr.__length -= n;
592         while( n ) {
593             if ( n <= __repr.__buffer.back.length ) {
594                 __repr.__buffer.back.popBackN(n);
595                 if ( __repr.__buffer.back.length == 0 ) {
596                     __repr.__buffer.popBack;
597                 }
598                 return;
599             }
600             n -= __repr.__buffer.back.length;
601             __repr.__buffer.popBack;
602         }
603     }
604     @property auto save() @safe {
605         auto n = Buffer!T();
606         n.__repr = new Repr(__repr);
607         return n;
608     }
609     @property auto ref opIndex(size_t n) const pure @safe {
610         assert( __repr && n < __repr.__length );
611         foreach(b; __repr.__buffer) {
612             if ( n < b.length ) {
613                 return b[n];
614             }
615             n -= b.length;
616         }
617         assert(false, "Impossible");
618     }
619     int indexOf(ubyte needle, int pos)
620     {
621         if ( pos >= length || !__repr || !__repr.__buffer)
622         {
623             return -1;
624         }
625         // skip pos bytes
626         debug(requests) tracef("search %d from pos %d", needle, pos);
627         int cp;
628         foreach (ref b; __repr.__buffer)
629         {
630             if ( pos >= b.length )
631             {
632                 pos -= b.length;
633                 cp += b.length;
634                 continue;
635             }
636             auto i = b[pos..$].countUntil(needle);
637             pos = 0;
638             if (i>=0)
639             {
640                 debug(requests) tracef("found at %d", i+cp);
641                 return cast(int)i+cp;
642             }
643             cp += b.length;
644         }
645         return -1;
646     }
647     Buffer!T opSlice(size_t m, size_t n) {
648         if ( empty || m == n ) {
649             return Buffer!T();
650         }
651         assert( m <= n && n <= __repr.__length);
652         auto res = this.save();
653         res.popBackN(res.__repr.__length-n);
654         res.popFrontN(m);
655         return res;
656     }
657     @property auto data(U=T[])() pure {
658         static if ( is(U==T[]) ) {
659             if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) {
660                 return __repr.__buffer.front;
661             }
662         }
663         Appender!(T[]) a;
664         if ( __repr && __repr.__buffer ) {
665             foreach(ref b; __repr.__buffer) {
666                 a.put(b);
667             }
668         }
669         static if ( is(U==T[]) ) {
670             return a.data;
671         } else {
672             return cast(U)a.data;
673         }
674     }
675     string opCast(string)() {
676         return this.toString;
677     }
678     bool opEquals(U)(U x) {
679         return cast(U)this == x;
680     }
681 
682 }
683 ///
684 public unittest {
685 
686     static assert(isInputRange!(Buffer!ubyte));
687     static assert(isForwardRange!(Buffer!ubyte));
688     static assert(hasLength!(Buffer!ubyte));
689     static assert(hasSlicing!(Buffer!ubyte));
690     static assert(isBidirectionalRange!(Buffer!ubyte));
691     static assert(isRandomAccessRange!(Buffer!ubyte));
692 
693     auto b = Buffer!ubyte();
694     b.put("abc".representation.dup);
695     b.put("def".representation.dup);
696     assert(b.length == 6);
697     assert(b.toString == "abcdef");
698     assert(b.front == 'a');
699     assert(b.back == 'f');
700     assert(equal(b[0..$], "abcdef"));
701     assert(equal(b[$-2..$], "ef"));
702     assert(b == "abcdef");
703     b.popFront;
704     b.popBack;
705     assert(b.front == 'b');
706     assert(b.back == 'e');
707     assert(b.length == 4);
708     assert(retro(b).front == 'e');
709     assert(countUntil(b, 'e') == 3);
710     assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c'
711     assert(equal(b, "bcde"));
712     b.popFront; b.popFront;
713     assert(b.front == 'd');
714     assert(b.front == b[0]);
715     assert(b.back == b[$-1]);
716 
717     auto c = Buffer!ubyte();
718     c.put("Header0: value0\n".representation.dup);
719     c.put("Header1: value1\n".representation.dup);
720     c.put("Header2: value2\n\nbody".representation.dup);
721     auto c_length = c.length;
722     auto eoh = countUntil(c, "\n\n");
723     assert(eoh == 47);
724     // foreach(header; c[0..eoh].splitter('\n') ) {
725     //     writeln(cast(string)header.data);
726     // }
727     assert(equal(findSplit(c, "\n\n")[2], "body"));
728     assert(c.length == c_length);
729     assert(c.indexOf('\n', 0) == 15);
730     assert(c.indexOf('\n', 16) == 31);
731     assert(c.indexOf('X', 16) == -1);
732 }
733 
734 public struct SSLOptions {
735     enum filetype {
736         pem,
737         asn1,
738         der = asn1,
739     }
740     private {
741         /**
742          * do we need to veryfy peer?
743          */
744         bool     _verifyPeer = true;
745         /**
746          * path to CA cert
747          */
748         string   _caCert;
749         /**
750          * path to key file (can also contain cert (for pem)
751          */
752         string   _keyFile;
753         /**
754          * path to cert file (can also contain key (for pem)
755          */
756         string   _certFile;
757         filetype _keyType = filetype.pem;
758         filetype _certType = filetype.pem;
759     }
760     ubyte haveFiles() pure nothrow @safe @nogc {
761         ubyte r = 0;
762         if ( _keyFile  ) r|=1;
763         if ( _certFile ) r|=2;
764         return r;
765     }
766     // do we want to verify peer certificates?
767     bool getVerifyPeer() pure nothrow @nogc {
768         return _verifyPeer;
769     }
770     SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe {
771         _verifyPeer = v;
772         return this;
773     }
774     /// set key file name and type (default - pem)
775     auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
776         _keyFile = f;
777         _keyType = t;
778         return this;
779     }
780     auto getKeyFile() @safe pure nothrow @nogc {
781         return _keyFile;
782     }
783     auto getKeyType() @safe pure nothrow @nogc {
784         return _keyType;
785     }
786     /// set cert file name and type (default - pem)
787     auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc {
788         _certFile = f;
789         _certType = t;
790         return this;
791     }
792     auto setCaCert(string p) @safe pure nothrow @nogc {
793         _caCert = p;
794         return this;
795     }
796     auto getCaCert() @safe pure nothrow @nogc {
797         return _caCert;
798     }
799     auto getCertFile() @safe pure nothrow @nogc {
800         return _certFile;
801     }
802     auto getCertType() @safe pure nothrow @nogc {
803         return _certType;
804     }
805     /// set key file type
806     void setKeyType(string t) @safe pure nothrow {
807         _keyType = cast(filetype)sslKeyTypes[t];
808     }
809     /// set cert file type
810     void setCertType(string t) @safe pure nothrow {
811         _certType = cast(filetype)sslKeyTypes[t];
812     }
813 }
814 static immutable int[string] sslKeyTypes;
815 shared static this() {
816     sslKeyTypes = [
817         "pem":SSLOptions.filetype.pem,
818         "asn1":SSLOptions.filetype.asn1,
819         "der":SSLOptions.filetype.der,
820     ];
821 }
822 
823 version(vibeD) {
824 }
825 else {
826     extern(C) {
827         int SSL_library_init();
828     }
829 
830     enum SSL_VERIFY_PEER = 0x01;
831     enum SSL_FILETYPE_PEM = 1;
832     enum SSL_FILETYPE_ASN1 = 2;
833 
834     immutable int[SSLOptions.filetype] ft2ssl;
835 
836     shared static this() {
837         ft2ssl = [
838             SSLOptions.filetype.pem: SSL_FILETYPE_PEM,
839             SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1,
840             SSLOptions.filetype.der: SSL_FILETYPE_ASN1
841         ];
842     }
843 
844     public class OpenSslSocket : Socket {
845         //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L;
846         private SSL* ssl;
847         private SSL_CTX* ctx;
848 
849         private void initSsl(SSLOptions opts) {
850             //ctx = SSL_CTX_new(SSLv3_client_method());
851             ctx = openssl.SSL_CTX_new(openssl.TLS_method());
852             assert(ctx !is null);
853             if ( opts.getVerifyPeer() ) {
854                 openssl.SSL_CTX_set_default_verify_paths(ctx);
855                 if ( opts.getCaCert() ) {
856                     openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null);
857                 }
858                 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null);
859             }
860             immutable keyFile = opts.getKeyFile();
861             immutable keyType = opts.getKeyType();
862             immutable certFile = opts.getCertFile();
863             immutable certType = opts.getCertType();
864             final switch(opts.haveFiles()) {
865                 case 0b11:  // both files
866                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
867                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]);
868                     break;
869                 case 0b01:  // key only
870                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  keyFile.toStringz(), ft2ssl[keyType]);
871                     openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]);
872                     break;
873                 case 0b10:  // cert only
874                     openssl.SSL_CTX_use_PrivateKey_file(ctx,  certFile.toStringz(), ft2ssl[certType]);
875                     openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]);
876                     break;
877                 case 0b00:
878                     break;
879             }
880             //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS);
881             //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null);
882             ssl = openssl.SSL_new(ctx);
883             openssl.SSL_set_fd(ssl, cast(int)this.handle);
884         }
885 
886         @trusted
887         override void connect(Address dest) {
888             super.connect(dest);
889             if(openssl.SSL_connect(ssl) == -1) {
890                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
891             }
892         }
893         auto connectSSL() {
894             if(openssl.SSL_connect(ssl) == -1) {
895                 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error()))));
896             }
897             debug(requests) tracef("ssl socket connected");
898             return this;
899         }
900         @trusted
901         override ptrdiff_t send(scope const(void)[] buf, SocketFlags flags) scope {
902             return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length);
903         }
904         override ptrdiff_t send(scope const(void)[] buf) scope {
905             return send(buf, SocketFlags.NONE);
906         }
907         @trusted
908         override ptrdiff_t receive(scope void[] buf, SocketFlags flags) scope {
909             return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length);
910         }
911         override ptrdiff_t receive(scope void[] buf) scope {
912             return receive(buf, SocketFlags.NONE);
913         }
914         this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) {
915             super(af, type);
916             initSsl(opts);
917         }
918         this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) {
919             super(sock, af);
920             initSsl(opts);
921         }
922         override void close() scope {
923             super.close();
924             if ( ssl !is null ) {
925                 openssl.SSL_free(ssl);
926                 ssl = null;
927             }
928             if ( ctx !is null ) {
929                 openssl.SSL_CTX_free(ctx);
930                 ctx = null;
931             }
932         }
933         void SSL_set_tlsext_host_name(string host) {
934 
935         }
936     }
937 
938     public class SSLSocketStream: SocketStream {
939         private SSLOptions _sslOptions;
940         private Socket underlyingSocket;
941         private SSL* ssl;
942         private string host;
943 
944         this(SSLOptions opts) {
945             _sslOptions = opts;
946         }
947         this(NetworkStream ostream, SSLOptions opts, string host = null) {
948             _sslOptions = opts;
949             this.host = host;
950             auto osock = ostream.so();
951             underlyingSocket = osock;
952             osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
953             auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions);
954             ssl = ss.ssl;
955             if ( host !is null ) {
956                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
957             }
958             ss.connectSSL();
959             __isOpen = true;
960             __isConnected = true;
961             s = ss;
962             debug(requests) tracef("ssl stream created from another stream: %s", s);
963         }
964         override void close() {
965             ssl = null;
966             host = null;
967             super.close();
968             if ( underlyingSocket ) {
969                 underlyingSocket.close();
970             }
971         }
972         override void open(AddressFamily fa) {
973             if ( s !is null ) {
974                 s.close();
975             }
976             auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions);
977             assert(ss !is null, "Can't create socket");
978             ssl = ss.ssl;
979             if ( host !is null ) {
980                 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host));
981             }
982             s = ss;
983             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
984             __isOpen = true;
985         }
986         override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) {
987             host = h;
988             return super.connect(h, p, timeout);
989         }
990         override SSLSocketStream accept() {
991             auto newso = s.accept();
992             if ( s is null ) {
993                 return null;
994             }
995             auto newstream = new SSLSocketStream(_sslOptions);
996             auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily);
997             newstream.s = sslSocket;
998             newstream.__isOpen = true;
999             newstream.__isConnected = true;
1000             return newstream;
1001         }
1002     }
1003     public class TCPSocketStream : SocketStream {
1004         override void open(AddressFamily fa) {
1005             if ( s !is null ) {
1006                 s.close();
1007             }
1008             s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP);
1009             assert(s !is null, "Can't create socket");
1010             __isOpen = true;
1011             s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
1012         }
1013         override TCPSocketStream accept() {
1014             auto newso = s.accept();
1015             if ( s is null ) {
1016                 return null;
1017             }
1018             auto newstream = new TCPSocketStream();
1019             newstream.s = newso;
1020             newstream.__isOpen = true;
1021             newstream.__isConnected = true;
1022             newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1);
1023             return newstream;
1024         }
1025     }
1026 }
1027 
1028 public interface NetworkStream {
1029     @property bool isConnected() const;
1030     @property bool isOpen() const;
1031 
1032     void close() @trusted;
1033 
1034     ///
1035     /// timeout is the socket write timeout.
1036     ///
1037     NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds);
1038 
1039     ptrdiff_t send(const(void)[] buff);
1040     ptrdiff_t receive(void[] buff);
1041 
1042     NetworkStream accept();
1043     @property void reuseAddr(bool);
1044     void bind(string);
1045     void bind(Address);
1046     void listen(int);
1047     version(vibeD) {
1048         TCPConnection so();
1049     } else {
1050         Socket so();
1051     }
1052     ///
1053     /// Set timeout for receive calls. 0 means no timeout.
1054     ///
1055     @property void readTimeout(Duration timeout);
1056 }
1057 
1058 public abstract class SocketStream : NetworkStream {
1059     private {
1060         Duration timeout;
1061         Socket   s;
1062         bool     __isOpen;
1063         bool     __isConnected;
1064         string   _bind;
1065     }
1066     void open(AddressFamily fa) {
1067     }
1068     @property Socket so() @safe pure {
1069         return s;
1070     }
1071     @property bool isOpen() @safe @nogc pure const {
1072         return s && __isOpen;
1073     }
1074     @property bool isConnected() @safe @nogc pure const {
1075         return s && __isOpen && __isConnected;
1076     }
1077     void close() @trusted {
1078         // can be callsed from dtor in GC, no gc calls here
1079         if ( isOpen ) {
1080             s.close();
1081             __isOpen = false;
1082             __isConnected = false;
1083         }
1084         s = null;
1085     }
1086     /***
1087     *  bind() just remember address. We will cal bind() at the time of connect as
1088     *  we can have several connection trials.
1089     ***/
1090     override void bind(string to) {
1091         _bind = to;
1092     }
1093     /***
1094     *  Make connection to remote site. Bind, handle connection error, try several addresses, etc
1095     ***/
1096     SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1097         debug(requests) tracef(format("Create connection to %s:%d", host, port));
1098         Address[] addresses;
1099         __isConnected = false;
1100         try {
1101             addresses = getAddress(host, port);
1102         } catch (Exception e) {
1103             throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg));
1104         }
1105         foreach(a; addresses) {
1106             debug(requests) tracef("Trying %s", a);
1107             try {
1108                 open(a.addressFamily);
1109                 if ( _bind !is null ) {
1110                     auto ad = getAddress(_bind);
1111                     debug(requests) tracef("bind to %s", ad[0]);
1112                     s.bind(ad[0]);
1113                 }
1114                 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout);
1115                 s.connect(a);
1116                 debug(requests) tracef("Connected to %s", a);
1117                 __isConnected = true;
1118                 break;
1119             } catch (SocketException e) {
1120                 debug(requests) warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg);
1121                 s.close();
1122             }
1123         }
1124         if ( !__isConnected ) {
1125             throw new ConnectError("Can't connect to %s:%d".format(host, port));
1126         }
1127         return this;
1128     }
1129 
1130     ptrdiff_t send(const(void)[] buff)
1131     in {assert(isConnected);}
1132     do {
1133         auto rc = s.send(buff);
1134         if (rc < 0) {
1135             close();
1136             throw new NetworkException("sending data: %s".format(to!string(strerror(errno))));
1137         }
1138         return rc;
1139     }
1140 
1141     ptrdiff_t receive(void[] buff) {
1142         while (true) {
1143             auto r = s.receive(buff);
1144             if (r < 0) {
1145                 auto e = errno;
1146                 version(Windows) {
1147                     close();
1148                     if ( e == 0 ) {
1149                         throw new TimeoutException("Timeout receiving data");
1150                     }
1151                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1152                 }
1153                 version(Posix) {
1154                     if ( e == EINTR ) {
1155                         continue;
1156                     }
1157                     close();
1158                     if ( e == EAGAIN ) {
1159                         throw new TimeoutException("Timeout receiving data");
1160                     }
1161                     throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno))));
1162                 }
1163             }
1164             else {
1165                 buff.length = r;
1166             }
1167             return r;
1168         }
1169         assert(false);
1170     }
1171 
1172     @property void readTimeout(Duration timeout) @safe {
1173         if ( __isConnected )
1174         {
1175             s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout);
1176         }
1177     }
1178     override SocketStream accept() {
1179         assert(false, "Implement before use");
1180     }
1181     @property override void reuseAddr(bool yes){
1182         if (yes) {
1183             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
1184         }
1185         else {
1186             s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0);
1187         }
1188     }
1189     override void bind(Address addr){
1190         s.bind(addr);
1191     }
1192     override void listen(int n) {
1193         s.listen(n);
1194     };
1195 }
1196 
1197 version (vibeD) {
1198     import vibe.core.net, vibe.stream.tls;
1199 
1200     public class TCPVibeStream : NetworkStream {
1201     private:
1202         TCPConnection _conn;
1203         Duration _readTimeout = Duration.max;
1204         bool _isOpen = true;
1205         string _bind;
1206 
1207     public:
1208         @property bool isConnected() const {
1209             return _conn.connected;
1210         }
1211         @property override bool isOpen() const {
1212             return _conn && _isOpen;
1213         }
1214         void close() @trusted {
1215             _conn.close();
1216             _isOpen = false;
1217         }
1218         override TCPConnection so() {
1219             return _conn;
1220         }
1221         override void bind(string to) {
1222                 _bind = to;
1223         }
1224         NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1225             // FIXME: timeout not supported in vibe.d
1226             try {
1227                 _conn = connectTCP(host, port, _bind);
1228             }
1229             catch (Exception e)
1230                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1231 
1232             return this;
1233         }
1234 
1235         ptrdiff_t send(const(void)[] buff) {
1236             _conn.write(cast(const(ubyte)[])buff);
1237             return buff.length;
1238         }
1239 
1240         ptrdiff_t receive(void[] buff) {
1241             if (!_conn.waitForData(_readTimeout)) {
1242                 if (!_conn.connected || _conn.empty ) {
1243                     return 0;
1244                 }
1245                 throw new TimeoutException("Timeout receiving data");
1246             }
1247 
1248             if(_conn.empty) {
1249                 return 0;
1250             }
1251 
1252             auto chunk = min(_conn.leastSize, buff.length);
1253             assert(chunk != 0);
1254             _conn.read(cast(ubyte[])buff[0 .. chunk]);
1255             return chunk;
1256         }
1257 
1258         @property void readTimeout(Duration timeout) {
1259             if (timeout == 0.seconds) {
1260                 _readTimeout = Duration.max;
1261             }
1262             else {
1263                 _readTimeout = timeout;
1264             }
1265         }
1266         override TCPVibeStream accept() {
1267             assert(false, "Must be implemented");
1268         }
1269         override @property void reuseAddr(bool){
1270             assert(false, "Not Implemented");
1271         }
1272         override void bind(Address){
1273             assert(false, "Not Implemented");
1274         }
1275         override void listen(int){
1276             assert(false, "Not Implemented");
1277         }
1278     }
1279 
1280     public class SSLVibeStream : TCPVibeStream {
1281     private:
1282         TLSStream _sslStream;
1283         bool   _isOpen = true;
1284         SSLOptions _sslOptions;
1285         TCPConnection underlyingConnection;
1286 
1287         void connectSSL(string host) {
1288             auto sslctx = createTLSContext(TLSContextKind.client);
1289             if ( _sslOptions.getVerifyPeer() ) {
1290                 if ( _sslOptions.getCaCert() == null ) {
1291                     throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate.");
1292                 }
1293                 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert());
1294                 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert;
1295             } else {
1296                 sslctx.peerValidationMode = TLSPeerValidationMode.none;
1297             }
1298             immutable keyFile = _sslOptions.getKeyFile();
1299             immutable certFile = _sslOptions.getCertFile();
1300             final switch(_sslOptions.haveFiles()) {
1301                 case 0b11:  // both files
1302                     sslctx.usePrivateKeyFile(keyFile);
1303                     sslctx.useCertificateChainFile(certFile);
1304                     break;
1305                 case 0b01:  // key only
1306                     sslctx.usePrivateKeyFile(keyFile);
1307                     sslctx.useCertificateChainFile(keyFile);
1308                     break;
1309                 case 0b10:  // cert only
1310                     sslctx.usePrivateKeyFile(certFile);
1311                     sslctx.useCertificateChainFile(certFile);
1312                     break;
1313                 case 0b00:
1314                     break;
1315             }
1316             _sslStream = createTLSStream(_conn, sslctx, host);
1317         }
1318 
1319     public:
1320         this(SSLOptions opts) {
1321             _sslOptions = opts;
1322         }
1323         override TCPConnection so() {
1324             return _conn;
1325         }
1326         this(NetworkStream ostream, SSLOptions opts, string host = null) {
1327             _sslOptions = opts;
1328             auto oconn = ostream.so();
1329             underlyingConnection = oconn;
1330             _conn = oconn;
1331             connectSSL(host);
1332         }
1333         override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) {
1334             try {
1335                 _conn = connectTCP(host, port);
1336                 connectSSL(host);
1337             }
1338             catch (ConnectError e) {
1339                 throw e;
1340             }
1341             catch (Exception e) {
1342                 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e);
1343             }
1344 
1345             return this;
1346         }
1347 
1348         override ptrdiff_t send(const(void)[] buff) {
1349             _sslStream.write(cast(const(ubyte)[])buff);
1350             return buff.length;
1351         }
1352 
1353         override ptrdiff_t receive(void[] buff) {
1354             if (!_sslStream.dataAvailableForRead) {
1355                 if (!_conn.waitForData(_readTimeout)) {
1356                     if (!_conn.connected) {
1357                         return 0;
1358                     }
1359                     throw new TimeoutException("Timeout receiving data");
1360                 }
1361             }
1362 
1363             if(_sslStream.empty) {
1364                 return 0;
1365             }
1366 
1367             auto chunk = min(_sslStream.leastSize, buff.length);
1368             assert(chunk != 0);
1369             _sslStream.read(cast(ubyte[])buff[0 .. chunk]);
1370             return chunk;
1371         }
1372 
1373         override void close() @trusted {
1374             if ( _sslStream )
1375             {
1376                 _sslStream.finalize();
1377             }
1378             _conn.close();
1379             _isOpen = false;
1380         }
1381         @property override bool isOpen() const {
1382             return _conn && _isOpen;
1383         }
1384         override SSLVibeStream accept() {
1385             assert(false, "Must be implemented");
1386         }
1387         override @property void reuseAddr(bool){
1388             assert(false, "Not Implemented");
1389         }
1390         override void bind(Address){
1391             assert(false, "Not Implemented");
1392         }
1393         override void listen(int){
1394             assert(false, "Not Implemented");
1395         }
1396     }
1397 }
1398 
1399 version (vibeD) {
1400     public alias TCPStream = TCPVibeStream;
1401     public alias SSLStream = SSLVibeStream;
1402 }
1403 else {
1404     public alias TCPStream = TCPSocketStream;
1405     public alias SSLStream = SSLSocketStream;
1406 }