1 module requests.streams; 2 3 private: 4 import std.algorithm; 5 import std.array; 6 import std.conv; 7 import std.experimental.logger; 8 import std.exception; 9 import std.format; 10 import std.range; 11 import std.range.primitives; 12 import std.string; 13 import std.stdio; 14 import std.traits; 15 import std.zlib; 16 import std.datetime; 17 import std.socket; 18 import core.stdc.errno; 19 import core.stdc.string; 20 21 import requests.ssl_adapter : openssl, SSL, SSL_CTX; 22 23 alias InDataHandler = DataPipeIface!ubyte; 24 25 public class ConnectError: Exception { 26 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 27 super(message, file, line, next); 28 } 29 } 30 31 class DecodingException: Exception { 32 this(string message, string file =__FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 33 super(message, file, line, next); 34 } 35 } 36 37 public class TimeoutException: Exception { 38 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 39 super(message, file, line, next); 40 } 41 } 42 43 public class NetworkException: Exception { 44 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 45 super(message, file, line, next); 46 } 47 } 48 49 /** 50 * DataPipeIface can accept some data, process, and return processed data. 51 */ 52 public interface DataPipeIface(E) { 53 /// Is there any processed data ready for reading? 54 bool empty(); 55 /// Put next data portion for processing 56 //void put(E[]); 57 void putNoCopy(E[]); 58 /// Get any ready data 59 E[] get(); 60 /// Signal on end of incoming data stream. 61 void flush(); 62 } 63 /** 64 * DataPipe is a pipeline of data processors, each accept some data, process it, and put result to next element in line. 65 * This class used to combine different Transfer- and Content- encodings. For example: unchunk transfer-encoding "chunnked", 66 * and uncompress Content-Encoding "gzip". 67 */ 68 public class DataPipe(E) : DataPipeIface!E { 69 70 DataPipeIface!(E)[] pipe; 71 Buffer!E buffer; 72 /// Append data processor to pipeline 73 /// Params: 74 /// p = processor 75 final void insert(DataPipeIface!E p) { 76 pipe ~= p; 77 } 78 final E[][] process(DataPipeIface!E p, E[][] data) { 79 E[][] result; 80 data.each!(e => p.putNoCopy(e)); 81 while(!p.empty()) result ~= p.get(); 82 return result; 83 } 84 /// Process next data portion. Data passed over pipeline and store result in buffer. 85 /// Params: 86 /// data = input data buffer. 87 /// NoCopy means we do not copy data to buffer, we keep reference 88 final void putNoCopy(E[] data) { 89 if ( pipe.empty ) { 90 buffer.putNoCopy(data); 91 return; 92 } 93 try { 94 auto t = process(pipe.front, [data]); 95 foreach(ref p; pipe[1..$]) { 96 t = process(p, t); 97 } 98 t.each!(b => buffer.putNoCopy(b)); 99 } 100 catch (Exception e) { 101 throw new DecodingException(e.msg); 102 } 103 } 104 /// Get what was collected in internal buffer and clear it. 105 /// Returns: 106 /// data collected. 107 final E[] get() { 108 if ( buffer.empty ) { 109 return E[].init; 110 } 111 auto res = buffer.data; 112 buffer = Buffer!E.init; 113 return res; 114 } 115 /// 116 /// get without datamove. but user receive [][] 117 /// 118 final E[][] getNoCopy() { 119 if ( buffer.empty ) { 120 return E[][].init; 121 } 122 E[][] res = buffer.__repr.__buffer; 123 buffer = Buffer!E.init; 124 return res; 125 } 126 /// Test if internal buffer is empty 127 /// Returns: 128 /// true if internal buffer is empty (nothing to get()) 129 final bool empty() pure const @safe { 130 return buffer.empty; 131 } 132 final void flush() { 133 E[][] product; 134 foreach(ref p; pipe) { 135 product.each!(e => p.putNoCopy(e)); 136 p.flush(); 137 product.length = 0; 138 while( !p.empty ) product ~= p.get(); 139 } 140 product.each!(b => buffer.putNoCopy(b)); 141 } 142 } 143 144 /** 145 * Processor for gzipped/compressed content. 146 * Also support InputRange interface. 147 */ 148 public class Decompressor(E) : DataPipeIface!E { 149 private { 150 Buffer!ubyte __buff; 151 UnCompress __zlib; 152 } 153 this() { 154 __buff = Buffer!ubyte(); 155 __zlib = new UnCompress(); 156 } 157 final override void putNoCopy(E[] data) { 158 if ( __zlib is null ) { 159 __zlib = new UnCompress(); 160 } 161 __buff.putNoCopy(__zlib.uncompress(data)); 162 } 163 final override E[] get() pure { 164 assert(__buff.length); 165 auto r = __buff.__repr.__buffer[0]; 166 __buff.popFrontN(r.length); 167 return cast(E[])r; 168 } 169 final override void flush() { 170 if ( __zlib is null ) { 171 return; 172 } 173 __buff.put(__zlib.flush()); 174 } 175 final override @property bool empty() const pure @safe { 176 debug(requests) tracef("empty=%b", __buff.empty); 177 return __buff.empty; 178 } 179 final @property auto ref front() pure const @safe { 180 debug(requests) tracef("front: buff length=%d", __buff.length); 181 return __buff.front; 182 } 183 final @property auto popFront() pure @safe { 184 debug(requests) tracef("popFront: buff length=%d", __buff.length); 185 return __buff.popFront; 186 } 187 final @property void popFrontN(size_t n) pure @safe { 188 __buff.popFrontN(n); 189 } 190 } 191 192 /** 193 * Unchunk chunked http responce body. 194 */ 195 public class DecodeChunked : DataPipeIface!ubyte { 196 // length := 0 197 // read chunk-size, chunk-extension (if any) and CRLF 198 // while (chunk-size > 0) { 199 // read chunk-data and CRLF 200 // append chunk-data to entity-body 201 // length := length + chunk-size 202 // read chunk-size and CRLF 203 // } 204 // read entity-header 205 // while (entity-header not empty) { 206 // append entity-header to existing header fields 207 // read entity-header 208 // } 209 // Content-Length := length 210 // Remove "chunked" from Transfer-Encoding 211 // 212 213 // Chunked-Body = *chunk 214 // last-chunk 215 // trailer 216 // CRLF 217 // 218 // chunk = chunk-size [ chunk-extension ] CRLF 219 // chunk-data CRLF 220 // chunk-size = 1*HEX 221 // last-chunk = 1*("0") [ chunk-extension ] CRLF 222 // 223 // chunk-extension= *( ";" chunk-ext-name [ "=" chunk-ext-val ] ) 224 // chunk-ext-name = token 225 // chunk-ext-val = token | quoted-string 226 // chunk-data = chunk-size(OCTET) 227 // trailer = *(entity-header CRLF) 228 229 alias eType = ubyte; 230 immutable eType[] CRLF = ['\r', '\n']; 231 private { 232 enum States {huntingSize, huntingSeparator, receiving, trailer}; 233 char state = States.huntingSize; 234 size_t chunk_size, to_receive; 235 Buffer!ubyte buff; 236 ubyte[] linebuff; 237 } 238 final void putNoCopy(eType[] data) { 239 while ( data.length ) { 240 if ( state == States.trailer ) { 241 to_receive = to_receive - min(to_receive, data.length); 242 return; 243 } 244 if ( state == States.huntingSize ) { 245 import std.ascii; 246 ubyte[10] digits; 247 int i; 248 for(i=0;i<data.length;i++) { 249 ubyte v = data[i]; 250 digits[i] = v; 251 if ( v == '\n' ) { 252 i+=1; 253 break; 254 } 255 } 256 linebuff ~= digits[0..i]; 257 if ( linebuff.length >= 80 ) { 258 throw new DecodingException("Can't find chunk size in the body"); 259 } 260 data = data[i..$]; 261 if (!linebuff.canFind(CRLF)) { 262 continue; 263 } 264 chunk_size = linebuff.filter!isHexDigit.map!toUpper.map!"a<='9'?a-'0':a-'A'+10".reduce!"a*16+b"; 265 state = States.receiving; 266 to_receive = chunk_size; 267 if ( chunk_size == 0 ) { 268 to_receive = 2-min(2, data.length); // trailing \r\n 269 state = States.trailer; 270 return; 271 } 272 continue; 273 } 274 if ( state == States.receiving ) { 275 if (to_receive > 0 ) { 276 auto can_store = min(to_receive, data.length); 277 buff.putNoCopy(data[0..can_store]); 278 data = data[can_store..$]; 279 to_receive -= can_store; 280 //tracef("Unchunked %d bytes from %d", can_store, chunk_size); 281 if ( to_receive == 0 ) { 282 //tracef("switch to huntig separator"); 283 state = States.huntingSeparator; 284 continue; 285 } 286 continue; 287 } 288 assert(false); 289 } 290 if ( state == States.huntingSeparator ) { 291 if ( data[0] == '\n' || data[0]=='\r') { 292 data = data[1..$]; 293 continue; 294 } 295 state = States.huntingSize; 296 linebuff.length = 0; 297 continue; 298 } 299 } 300 } 301 final eType[] get() { 302 auto r = buff.__repr.__buffer[0]; 303 buff.popFrontN(r.length); 304 return r; 305 } 306 final void flush() { 307 } 308 final bool empty() { 309 debug(requests) tracef("empty=%b", buff.empty); 310 return buff.empty; 311 } 312 final bool done() { 313 return state==States.trailer && to_receive==0; 314 } 315 } 316 317 unittest { 318 info("Testing DataPipe"); 319 globalLogLevel(LogLevel.info); 320 alias eType = char; 321 eType[] gzipped = [ 322 0x1F, 0x8B, 0x08, 0x00, 0xB1, 0xA3, 0xEA, 0x56, 323 0x00, 0x03, 0x4B, 0x4C, 0x4A, 0xE6, 0x4A, 0x49, 324 0x4D, 0xE3, 0x02, 0x00, 0x75, 0x0B, 0xB0, 0x88, 325 0x08, 0x00, 0x00, 0x00 326 ]; // "abc\ndef\n" 327 auto d = new Decompressor!eType(); 328 d.putNoCopy(gzipped[0..2].dup); 329 d.putNoCopy(gzipped[2..10].dup); 330 d.putNoCopy(gzipped[10..$].dup); 331 d.flush(); 332 assert(equal(d.filter!(a => a!='b'), "ac\ndef\n")); 333 334 auto e = new Decompressor!eType(); 335 e.putNoCopy(gzipped[0..10].dup); 336 e.putNoCopy(gzipped[10..$].dup); 337 e.flush(); 338 assert(equal(e.filter!(a => a!='b'), "ac\ndef\n")); 339 // writeln(gzipped.decompress.filter!(a => a!='b').array); 340 auto dp = new DataPipe!eType; 341 dp.insert(new Decompressor!eType()); 342 dp.putNoCopy(gzipped[0..2].dup); 343 dp.putNoCopy(gzipped[2..$].dup); 344 dp.flush(); 345 assert(equal(dp.get(), "abc\ndef\n")); 346 // empty datapipe shoul just pass input to output 347 auto dpu = new DataPipe!ubyte; 348 dpu.putNoCopy("abcd".dup.representation); 349 dpu.putNoCopy("efgh".dup.representation); 350 dpu.flush(); 351 assert(equal(dpu.get(), "abcdefgh")); 352 info("Test unchunker properties"); 353 ubyte[] twoChunks = "2\r\n12\r\n2\r\n34\r\n0\r\n\r\n".dup.representation; 354 ubyte[][] result; 355 auto uc = new DecodeChunked(); 356 uc.putNoCopy(twoChunks); 357 while(!uc.empty) { 358 result ~= uc.get(); 359 } 360 assert(equal(result[0], ['1', '2'])); 361 assert(equal(result[1], ['3', '4'])); 362 info("unchunker correctness - ok"); 363 result[0][0] = '5'; 364 assert(twoChunks[3] == '5'); 365 info("unchunker zero copy - ok"); 366 info("Testing DataPipe - done"); 367 } 368 369 public class LineSplitter : DataPipeIface!ubyte 370 { 371 private 372 { 373 Buffer!ubyte buff; 374 int NL = -1; 375 int scanned; 376 } 377 bool empty() 378 { 379 return NL == -1; 380 } 381 void putNoCopy(ubyte[] s) 382 { 383 debug(requests) tracef("put data: <<%s>>", cast(string)s); 384 buff.putNoCopy(s); 385 if (NL == -1) 386 { 387 auto new_NL = buff.indexOf(cast(ubyte)'\n', scanned); 388 NL = new_NL; 389 debug(requests) tracef("new nl=%d", NL); 390 } 391 scanned = cast(int)buff.length; 392 } 393 ubyte[] get() 394 { 395 if (NL>=0) 396 { 397 auto res = buff[0..NL+1].data; 398 buff.popFrontN(NL+1); 399 NL = buff.indexOf(cast(ubyte)'\n', 0); 400 if (NL == -1) 401 { 402 scanned = cast(int)buff.length; 403 } 404 return res; 405 } 406 return new ubyte[](0); 407 } 408 void flush() 409 { 410 NL = cast(int)buff.length - 1; 411 } 412 } 413 unittest 414 { 415 info("testing LineSplitter"); 416 LineSplitter ls = new LineSplitter(); 417 ls.putNoCopy("abc".representation.dup); 418 assert(ls.empty); 419 ls.putNoCopy("\n".representation.dup); 420 assert(!ls.empty); 421 assert(equal(ls.get, "abc\n")); 422 assert(ls.empty); 423 assert(equal(ls.get,"")); 424 ls.putNoCopy("def\n".representation.dup); 425 ls.putNoCopy("ghi\njk".representation.dup); 426 assert(!ls.empty); 427 assert(equal(ls.get, "def\n")); 428 assert(!ls.empty); 429 assert(equal(ls.get, "ghi\n")); 430 assert(ls.empty); 431 ls.flush(); 432 assert(equal(ls.get, "jk")); 433 } 434 /** 435 * Buffer used to collect and process data from network. It remainds Appender, but support 436 * also Range interface. 437 * $(P To place data in buffer use put() method.) 438 * $(P To retrieve data from buffer you can use several methods:) 439 * $(UL 440 * $(LI Range methods: front, back, index []) 441 * $(LI data method: return collected data (like Appender.data)) 442 * ) 443 */ 444 enum CACHESIZE = 1024; 445 446 static long reprAlloc; 447 static long reprCacheHit; 448 static long reprCacheRequests; 449 450 451 public struct Buffer(T) { 452 // static Repr[CACHESIZE] cache; 453 // static uint cacheIndex; 454 455 private { 456 Repr cachedOrNew() { 457 return new Repr; 458 // reprCacheRequests++; 459 // if ( false && cacheIndex>0 ) { 460 // reprCacheHit++; 461 // cacheIndex -= 1; 462 // return cache[cacheIndex]; 463 // } else { 464 // return new Repr; 465 // } 466 } 467 class Repr { 468 size_t __length; 469 Unqual!T[][] __buffer; 470 this() { 471 reprAlloc++; 472 __length = 0; 473 } 474 this(Repr other) { 475 reprAlloc++; 476 if ( other is null ) 477 return; 478 __length = other.__length; 479 __buffer = other.__buffer.dup; 480 } 481 } 482 Repr __repr; 483 } 484 485 alias toString = data!string; 486 487 this(this) { 488 if ( !__repr ) { 489 return; 490 } 491 __repr = new Repr(__repr); 492 } 493 this(U)(U[] data) { 494 put(data); 495 } 496 ~this() { 497 __repr = null; 498 } 499 /*************** 500 * store data. Data copied 501 */ 502 auto put(U)(U[] data) { 503 if ( data.length == 0 ) { 504 return; 505 } 506 if ( !__repr ) { 507 __repr = cachedOrNew(); 508 } 509 static if (!is(U == T)) { 510 auto d = cast(T[])(data); 511 __repr.__length += d.length; 512 __repr.__buffer ~= d.dup; 513 } else { 514 __repr.__length += data.length; 515 __repr.__buffer ~= data.dup; 516 } 517 return; 518 } 519 auto putNoCopy(U)(U[] data) { 520 if ( data.length == 0 ) { 521 return; 522 } 523 if ( !__repr ) { 524 __repr = cachedOrNew(); 525 } 526 static if (!is(U == T)) { 527 auto d = cast(T[])(data); 528 __repr.__length += d.length; 529 __repr.__buffer ~= d; 530 } else { 531 __repr.__length += data.length; 532 __repr.__buffer ~= data; 533 } 534 return; 535 } 536 @property auto opDollar() const pure @safe { 537 return __repr.__length; 538 } 539 @property size_t length() const pure @safe { 540 if ( !__repr ) { 541 return 0; 542 } 543 return __repr.__length; 544 } 545 @property auto empty() const pure @safe { 546 return length == 0; 547 } 548 @property auto ref front() const pure @safe { 549 assert(length); 550 return __repr.__buffer.front.front; 551 } 552 @property auto ref back() const pure @safe { 553 assert(length); 554 return __repr.__buffer.back.back; 555 } 556 @property void popFront() pure @safe { 557 assert(length); 558 with ( __repr ) { 559 __buffer.front.popFront; 560 if ( __buffer.front.length == 0 ) { 561 __buffer.popFront; 562 } 563 __length--; 564 } 565 } 566 @property void popFrontN(size_t n) pure @safe { 567 assert(n <= length, "lengnt: %d, n=%d".format(length, n)); 568 __repr.__length -= n; 569 while( n ) { 570 if ( n <= __repr.__buffer.front.length ) { 571 __repr.__buffer.front.popFrontN(n); 572 if ( __repr.__buffer.front.length == 0 ) { 573 __repr.__buffer.popFront; 574 } 575 return; 576 } 577 n -= __repr.__buffer.front.length; 578 __repr.__buffer.popFront; 579 } 580 } 581 @property void popBack() pure @safe { 582 assert(length); 583 __repr.__buffer.back.popBack; 584 if ( __repr.__buffer.back.length == 0 ) { 585 __repr.__buffer.popBack; 586 } 587 __repr.__length--; 588 } 589 @property void popBackN(size_t n) pure @safe { 590 assert(n <= length, "n: %d, length: %d".format(n, length)); 591 __repr.__length -= n; 592 while( n ) { 593 if ( n <= __repr.__buffer.back.length ) { 594 __repr.__buffer.back.popBackN(n); 595 if ( __repr.__buffer.back.length == 0 ) { 596 __repr.__buffer.popBack; 597 } 598 return; 599 } 600 n -= __repr.__buffer.back.length; 601 __repr.__buffer.popBack; 602 } 603 } 604 @property auto save() @safe { 605 auto n = Buffer!T(); 606 n.__repr = new Repr(__repr); 607 return n; 608 } 609 @property auto ref opIndex(size_t n) const pure @safe { 610 assert( __repr && n < __repr.__length ); 611 foreach(b; __repr.__buffer) { 612 if ( n < b.length ) { 613 return b[n]; 614 } 615 n -= b.length; 616 } 617 assert(false, "Impossible"); 618 } 619 int indexOf(ubyte needle, int pos) 620 { 621 if ( pos >= length || !__repr || !__repr.__buffer) 622 { 623 return -1; 624 } 625 // skip pos bytes 626 debug(requests) tracef("search %d from pos %d", needle, pos); 627 int cp; 628 foreach (ref b; __repr.__buffer) 629 { 630 if ( pos >= b.length ) 631 { 632 pos -= b.length; 633 cp += b.length; 634 continue; 635 } 636 auto i = b[pos..$].countUntil(needle); 637 pos = 0; 638 if (i>=0) 639 { 640 debug(requests) tracef("found at %d", i+cp); 641 return cast(int)i+cp; 642 } 643 cp += b.length; 644 } 645 return -1; 646 } 647 Buffer!T opSlice(size_t m, size_t n) { 648 if ( empty || m == n ) { 649 return Buffer!T(); 650 } 651 assert( m <= n && n <= __repr.__length); 652 auto res = this.save(); 653 res.popBackN(res.__repr.__length-n); 654 res.popFrontN(m); 655 return res; 656 } 657 @property auto data(U=T[])() pure { 658 static if ( is(U==T[]) ) { 659 if ( __repr && __repr.__buffer && __repr.__buffer.length == 1 ) { 660 return __repr.__buffer.front; 661 } 662 } 663 Appender!(T[]) a; 664 if ( __repr && __repr.__buffer ) { 665 foreach(ref b; __repr.__buffer) { 666 a.put(b); 667 } 668 } 669 static if ( is(U==T[]) ) { 670 return a.data; 671 } else { 672 return cast(U)a.data; 673 } 674 } 675 string opCast(string)() { 676 return this.toString; 677 } 678 bool opEquals(U)(U x) { 679 return cast(U)this == x; 680 } 681 682 } 683 /// 684 public unittest { 685 686 static assert(isInputRange!(Buffer!ubyte)); 687 static assert(isForwardRange!(Buffer!ubyte)); 688 static assert(hasLength!(Buffer!ubyte)); 689 static assert(hasSlicing!(Buffer!ubyte)); 690 static assert(isBidirectionalRange!(Buffer!ubyte)); 691 static assert(isRandomAccessRange!(Buffer!ubyte)); 692 693 auto b = Buffer!ubyte(); 694 b.put("abc".representation.dup); 695 b.put("def".representation.dup); 696 assert(b.length == 6); 697 assert(b.toString == "abcdef"); 698 assert(b.front == 'a'); 699 assert(b.back == 'f'); 700 assert(equal(b[0..$], "abcdef")); 701 assert(equal(b[$-2..$], "ef")); 702 assert(b == "abcdef"); 703 b.popFront; 704 b.popBack; 705 assert(b.front == 'b'); 706 assert(b.back == 'e'); 707 assert(b.length == 4); 708 assert(retro(b).front == 'e'); 709 assert(countUntil(b, 'e') == 3); 710 assert(equal(splitter(b, 'c').array[1], ['d', 'e'])); // split "bcde" on 'c' 711 assert(equal(b, "bcde")); 712 b.popFront; b.popFront; 713 assert(b.front == 'd'); 714 assert(b.front == b[0]); 715 assert(b.back == b[$-1]); 716 717 auto c = Buffer!ubyte(); 718 c.put("Header0: value0\n".representation.dup); 719 c.put("Header1: value1\n".representation.dup); 720 c.put("Header2: value2\n\nbody".representation.dup); 721 auto c_length = c.length; 722 auto eoh = countUntil(c, "\n\n"); 723 assert(eoh == 47); 724 // foreach(header; c[0..eoh].splitter('\n') ) { 725 // writeln(cast(string)header.data); 726 // } 727 assert(equal(findSplit(c, "\n\n")[2], "body")); 728 assert(c.length == c_length); 729 assert(c.indexOf('\n', 0) == 15); 730 assert(c.indexOf('\n', 16) == 31); 731 assert(c.indexOf('X', 16) == -1); 732 } 733 734 public struct SSLOptions { 735 enum filetype { 736 pem, 737 asn1, 738 der = asn1, 739 } 740 private { 741 /** 742 * do we need to veryfy peer? 743 */ 744 bool _verifyPeer = true; 745 /** 746 * path to CA cert 747 */ 748 string _caCert; 749 /** 750 * path to key file (can also contain cert (for pem) 751 */ 752 string _keyFile; 753 /** 754 * path to cert file (can also contain key (for pem) 755 */ 756 string _certFile; 757 filetype _keyType = filetype.pem; 758 filetype _certType = filetype.pem; 759 } 760 ubyte haveFiles() pure nothrow @safe @nogc { 761 ubyte r = 0; 762 if ( _keyFile ) r|=1; 763 if ( _certFile ) r|=2; 764 return r; 765 } 766 // do we want to verify peer certificates? 767 bool getVerifyPeer() pure nothrow @nogc { 768 return _verifyPeer; 769 } 770 SSLOptions setVerifyPeer(bool v) pure nothrow @nogc @safe { 771 _verifyPeer = v; 772 return this; 773 } 774 /// set key file name and type (default - pem) 775 auto setKeyFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 776 _keyFile = f; 777 _keyType = t; 778 return this; 779 } 780 auto getKeyFile() @safe pure nothrow @nogc { 781 return _keyFile; 782 } 783 auto getKeyType() @safe pure nothrow @nogc { 784 return _keyType; 785 } 786 /// set cert file name and type (default - pem) 787 auto setCertFile(string f, filetype t = filetype.pem) @safe pure nothrow @nogc { 788 _certFile = f; 789 _certType = t; 790 return this; 791 } 792 auto setCaCert(string p) @safe pure nothrow @nogc { 793 _caCert = p; 794 return this; 795 } 796 auto getCaCert() @safe pure nothrow @nogc { 797 return _caCert; 798 } 799 auto getCertFile() @safe pure nothrow @nogc { 800 return _certFile; 801 } 802 auto getCertType() @safe pure nothrow @nogc { 803 return _certType; 804 } 805 /// set key file type 806 void setKeyType(string t) @safe pure nothrow { 807 _keyType = cast(filetype)sslKeyTypes[t]; 808 } 809 /// set cert file type 810 void setCertType(string t) @safe pure nothrow { 811 _certType = cast(filetype)sslKeyTypes[t]; 812 } 813 } 814 static immutable int[string] sslKeyTypes; 815 shared static this() { 816 sslKeyTypes = [ 817 "pem":SSLOptions.filetype.pem, 818 "asn1":SSLOptions.filetype.asn1, 819 "der":SSLOptions.filetype.der, 820 ]; 821 } 822 823 version(vibeD) { 824 } 825 else { 826 extern(C) { 827 int SSL_library_init(); 828 } 829 830 enum SSL_VERIFY_PEER = 0x01; 831 enum SSL_FILETYPE_PEM = 1; 832 enum SSL_FILETYPE_ASN1 = 2; 833 834 immutable int[SSLOptions.filetype] ft2ssl; 835 836 shared static this() { 837 ft2ssl = [ 838 SSLOptions.filetype.pem: SSL_FILETYPE_PEM, 839 SSLOptions.filetype.asn1: SSL_FILETYPE_ASN1, 840 SSLOptions.filetype.der: SSL_FILETYPE_ASN1 841 ]; 842 } 843 844 public class OpenSslSocket : Socket { 845 //enum SSL_MODE_RELEASE_BUFFERS = 0x00000010L; 846 private SSL* ssl; 847 private SSL_CTX* ctx; 848 849 private void initSsl(SSLOptions opts) { 850 //ctx = SSL_CTX_new(SSLv3_client_method()); 851 ctx = openssl.SSL_CTX_new(openssl.TLS_method()); 852 assert(ctx !is null); 853 if ( opts.getVerifyPeer() ) { 854 openssl.SSL_CTX_set_default_verify_paths(ctx); 855 if ( opts.getCaCert() ) { 856 openssl.SSL_CTX_load_verify_locations(ctx, cast(char*)opts.getCaCert().toStringz(), cast(char*)null); 857 } 858 openssl.SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, null); 859 } 860 immutable keyFile = opts.getKeyFile(); 861 immutable keyType = opts.getKeyType(); 862 immutable certFile = opts.getCertFile(); 863 immutable certType = opts.getCertType(); 864 final switch(opts.haveFiles()) { 865 case 0b11: // both files 866 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 867 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(),ft2ssl[certType]); 868 break; 869 case 0b01: // key only 870 openssl.SSL_CTX_use_PrivateKey_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 871 openssl.SSL_CTX_use_certificate_file(ctx, keyFile.toStringz(), ft2ssl[keyType]); 872 break; 873 case 0b10: // cert only 874 openssl.SSL_CTX_use_PrivateKey_file(ctx, certFile.toStringz(), ft2ssl[certType]); 875 openssl.SSL_CTX_use_certificate_file(ctx, certFile.toStringz(), ft2ssl[certType]); 876 break; 877 case 0b00: 878 break; 879 } 880 //SSL_CTX_set_mode(ctx, SSL_MODE_RELEASE_BUFFERS); 881 //SSL_CTX_ctrl(ctx, 33, SSL_MODE_RELEASE_BUFFERS, null); 882 ssl = openssl.SSL_new(ctx); 883 openssl.SSL_set_fd(ssl, cast(int)this.handle); 884 } 885 886 @trusted 887 override void connect(Address dest) { 888 super.connect(dest); 889 if(openssl.SSL_connect(ssl) == -1) { 890 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 891 } 892 } 893 auto connectSSL() { 894 if(openssl.SSL_connect(ssl) == -1) { 895 throw new Exception("ssl connect failed: %s".format(to!string(openssl.ERR_reason_error_string(openssl.ERR_get_error())))); 896 } 897 debug(requests) tracef("ssl socket connected"); 898 return this; 899 } 900 @trusted 901 override ptrdiff_t send(scope const(void)[] buf, SocketFlags flags) scope { 902 return openssl.SSL_write(ssl, buf.ptr, cast(uint) buf.length); 903 } 904 override ptrdiff_t send(scope const(void)[] buf) scope { 905 return send(buf, SocketFlags.NONE); 906 } 907 @trusted 908 override ptrdiff_t receive(scope void[] buf, SocketFlags flags) scope { 909 return openssl.SSL_read(ssl, buf.ptr, cast(int)buf.length); 910 } 911 override ptrdiff_t receive(scope void[] buf) scope { 912 return receive(buf, SocketFlags.NONE); 913 } 914 this(AddressFamily af, SocketType type = SocketType.STREAM, SSLOptions opts = SSLOptions()) { 915 super(af, type); 916 initSsl(opts); 917 } 918 this(socket_t sock, AddressFamily af, SSLOptions opts = SSLOptions()) { 919 super(sock, af); 920 initSsl(opts); 921 } 922 override void close() scope { 923 super.close(); 924 if ( ssl !is null ) { 925 openssl.SSL_free(ssl); 926 ssl = null; 927 } 928 if ( ctx !is null ) { 929 openssl.SSL_CTX_free(ctx); 930 ctx = null; 931 } 932 } 933 void SSL_set_tlsext_host_name(string host) { 934 935 } 936 } 937 938 public class SSLSocketStream: SocketStream { 939 private SSLOptions _sslOptions; 940 private Socket underlyingSocket; 941 private SSL* ssl; 942 private string host; 943 944 this(SSLOptions opts) { 945 _sslOptions = opts; 946 } 947 this(NetworkStream ostream, SSLOptions opts, string host = null) { 948 _sslOptions = opts; 949 this.host = host; 950 auto osock = ostream.so(); 951 underlyingSocket = osock; 952 osock.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 953 auto ss = new OpenSslSocket(osock.handle, osock.addressFamily, _sslOptions); 954 ssl = ss.ssl; 955 if ( host !is null ) { 956 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 957 } 958 ss.connectSSL(); 959 __isOpen = true; 960 __isConnected = true; 961 s = ss; 962 debug(requests) tracef("ssl stream created from another stream: %s", s); 963 } 964 override void close() { 965 ssl = null; 966 host = null; 967 super.close(); 968 if ( underlyingSocket ) { 969 underlyingSocket.close(); 970 } 971 } 972 override void open(AddressFamily fa) { 973 if ( s !is null ) { 974 s.close(); 975 } 976 auto ss = new OpenSslSocket(fa, SocketType.STREAM, _sslOptions); 977 assert(ss !is null, "Can't create socket"); 978 ssl = ss.ssl; 979 if ( host !is null ) { 980 openssl.SSL_set_tlsext_host_name(ssl, toStringz(host)); 981 } 982 s = ss; 983 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 984 __isOpen = true; 985 } 986 override SocketStream connect(string h, ushort p, Duration timeout = 10.seconds) { 987 host = h; 988 return super.connect(h, p, timeout); 989 } 990 override SSLSocketStream accept() { 991 auto newso = s.accept(); 992 if ( s is null ) { 993 return null; 994 } 995 auto newstream = new SSLSocketStream(_sslOptions); 996 auto sslSocket = new OpenSslSocket(newso.handle, s.addressFamily); 997 newstream.s = sslSocket; 998 newstream.__isOpen = true; 999 newstream.__isConnected = true; 1000 return newstream; 1001 } 1002 } 1003 public class TCPSocketStream : SocketStream { 1004 override void open(AddressFamily fa) { 1005 if ( s !is null ) { 1006 s.close(); 1007 } 1008 s = new Socket(fa, SocketType.STREAM, ProtocolType.TCP); 1009 assert(s !is null, "Can't create socket"); 1010 __isOpen = true; 1011 s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 1012 } 1013 override TCPSocketStream accept() { 1014 auto newso = s.accept(); 1015 if ( s is null ) { 1016 return null; 1017 } 1018 auto newstream = new TCPSocketStream(); 1019 newstream.s = newso; 1020 newstream.__isOpen = true; 1021 newstream.__isConnected = true; 1022 newstream.s.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); 1023 return newstream; 1024 } 1025 } 1026 } 1027 1028 public interface NetworkStream { 1029 @property bool isConnected() const; 1030 @property bool isOpen() const; 1031 1032 void close() @trusted; 1033 1034 /// 1035 /// timeout is the socket write timeout. 1036 /// 1037 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds); 1038 1039 ptrdiff_t send(const(void)[] buff); 1040 ptrdiff_t receive(void[] buff); 1041 1042 NetworkStream accept(); 1043 @property void reuseAddr(bool); 1044 void bind(string); 1045 void bind(Address); 1046 void listen(int); 1047 version(vibeD) { 1048 TCPConnection so(); 1049 } else { 1050 Socket so(); 1051 } 1052 /// 1053 /// Set timeout for receive calls. 0 means no timeout. 1054 /// 1055 @property void readTimeout(Duration timeout); 1056 } 1057 1058 public abstract class SocketStream : NetworkStream { 1059 private { 1060 Duration timeout; 1061 Socket s; 1062 bool __isOpen; 1063 bool __isConnected; 1064 string _bind; 1065 } 1066 void open(AddressFamily fa) { 1067 } 1068 @property Socket so() @safe pure { 1069 return s; 1070 } 1071 @property bool isOpen() @safe @nogc pure const { 1072 return s && __isOpen; 1073 } 1074 @property bool isConnected() @safe @nogc pure const { 1075 return s && __isOpen && __isConnected; 1076 } 1077 void close() @trusted { 1078 // can be callsed from dtor in GC, no gc calls here 1079 if ( isOpen ) { 1080 s.close(); 1081 __isOpen = false; 1082 __isConnected = false; 1083 } 1084 s = null; 1085 } 1086 /*** 1087 * bind() just remember address. We will cal bind() at the time of connect as 1088 * we can have several connection trials. 1089 ***/ 1090 override void bind(string to) { 1091 _bind = to; 1092 } 1093 /*** 1094 * Make connection to remote site. Bind, handle connection error, try several addresses, etc 1095 ***/ 1096 SocketStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1097 debug(requests) tracef(format("Create connection to %s:%d", host, port)); 1098 Address[] addresses; 1099 __isConnected = false; 1100 try { 1101 addresses = getAddress(host, port); 1102 } catch (Exception e) { 1103 throw new ConnectError("Can't resolve name when connect to %s:%d: %s".format(host, port, e.msg)); 1104 } 1105 foreach(a; addresses) { 1106 debug(requests) tracef("Trying %s", a); 1107 try { 1108 open(a.addressFamily); 1109 if ( _bind !is null ) { 1110 auto ad = getAddress(_bind); 1111 debug(requests) tracef("bind to %s", ad[0]); 1112 s.bind(ad[0]); 1113 } 1114 s.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, timeout); 1115 s.connect(a); 1116 debug(requests) tracef("Connected to %s", a); 1117 __isConnected = true; 1118 break; 1119 } catch (SocketException e) { 1120 debug(requests) warningf("Failed to connect to %s:%d(%s): %s", host, port, a, e.msg); 1121 s.close(); 1122 } 1123 } 1124 if ( !__isConnected ) { 1125 throw new ConnectError("Can't connect to %s:%d".format(host, port)); 1126 } 1127 return this; 1128 } 1129 1130 ptrdiff_t send(const(void)[] buff) 1131 in {assert(isConnected);} 1132 do { 1133 auto rc = s.send(buff); 1134 if (rc < 0) { 1135 close(); 1136 throw new NetworkException("sending data: %s".format(to!string(strerror(errno)))); 1137 } 1138 return rc; 1139 } 1140 1141 ptrdiff_t receive(void[] buff) { 1142 while (true) { 1143 auto r = s.receive(buff); 1144 if (r < 0) { 1145 auto e = errno; 1146 version(Windows) { 1147 close(); 1148 if ( e == 0 ) { 1149 throw new TimeoutException("Timeout receiving data"); 1150 } 1151 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1152 } 1153 version(Posix) { 1154 if ( e == EINTR ) { 1155 continue; 1156 } 1157 close(); 1158 if ( e == EAGAIN ) { 1159 throw new TimeoutException("Timeout receiving data"); 1160 } 1161 throw new NetworkException("Unexpected error %s while receiving data".format(to!string(strerror(errno)))); 1162 } 1163 } 1164 else { 1165 buff.length = r; 1166 } 1167 return r; 1168 } 1169 assert(false); 1170 } 1171 1172 @property void readTimeout(Duration timeout) @safe { 1173 if ( __isConnected ) 1174 { 1175 s.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, timeout); 1176 } 1177 } 1178 override SocketStream accept() { 1179 assert(false, "Implement before use"); 1180 } 1181 @property override void reuseAddr(bool yes){ 1182 if (yes) { 1183 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); 1184 } 1185 else { 1186 s.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 0); 1187 } 1188 } 1189 override void bind(Address addr){ 1190 s.bind(addr); 1191 } 1192 override void listen(int n) { 1193 s.listen(n); 1194 }; 1195 } 1196 1197 version (vibeD) { 1198 import vibe.core.net, vibe.stream.tls; 1199 1200 public class TCPVibeStream : NetworkStream { 1201 private: 1202 TCPConnection _conn; 1203 Duration _readTimeout = Duration.max; 1204 bool _isOpen = true; 1205 string _bind; 1206 1207 public: 1208 @property bool isConnected() const { 1209 return _conn.connected; 1210 } 1211 @property override bool isOpen() const { 1212 return _conn && _isOpen; 1213 } 1214 void close() @trusted { 1215 _conn.close(); 1216 _isOpen = false; 1217 } 1218 override TCPConnection so() { 1219 return _conn; 1220 } 1221 override void bind(string to) { 1222 _bind = to; 1223 } 1224 NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1225 // FIXME: timeout not supported in vibe.d 1226 try { 1227 _conn = connectTCP(host, port, _bind); 1228 } 1229 catch (Exception e) 1230 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1231 1232 return this; 1233 } 1234 1235 ptrdiff_t send(const(void)[] buff) { 1236 _conn.write(cast(const(ubyte)[])buff); 1237 return buff.length; 1238 } 1239 1240 ptrdiff_t receive(void[] buff) { 1241 if (!_conn.waitForData(_readTimeout)) { 1242 if (!_conn.connected || _conn.empty ) { 1243 return 0; 1244 } 1245 throw new TimeoutException("Timeout receiving data"); 1246 } 1247 1248 if(_conn.empty) { 1249 return 0; 1250 } 1251 1252 auto chunk = min(_conn.leastSize, buff.length); 1253 assert(chunk != 0); 1254 _conn.read(cast(ubyte[])buff[0 .. chunk]); 1255 return chunk; 1256 } 1257 1258 @property void readTimeout(Duration timeout) { 1259 if (timeout == 0.seconds) { 1260 _readTimeout = Duration.max; 1261 } 1262 else { 1263 _readTimeout = timeout; 1264 } 1265 } 1266 override TCPVibeStream accept() { 1267 assert(false, "Must be implemented"); 1268 } 1269 override @property void reuseAddr(bool){ 1270 assert(false, "Not Implemented"); 1271 } 1272 override void bind(Address){ 1273 assert(false, "Not Implemented"); 1274 } 1275 override void listen(int){ 1276 assert(false, "Not Implemented"); 1277 } 1278 } 1279 1280 public class SSLVibeStream : TCPVibeStream { 1281 private: 1282 TLSStream _sslStream; 1283 bool _isOpen = true; 1284 SSLOptions _sslOptions; 1285 TCPConnection underlyingConnection; 1286 1287 void connectSSL(string host) { 1288 auto sslctx = createTLSContext(TLSContextKind.client); 1289 if ( _sslOptions.getVerifyPeer() ) { 1290 if ( _sslOptions.getCaCert() == null ) { 1291 throw new ConnectError("With vibe.d you have to call setCaCert() before verify server certificate."); 1292 } 1293 sslctx.useTrustedCertificateFile(_sslOptions.getCaCert()); 1294 sslctx.peerValidationMode = TLSPeerValidationMode.trustedCert; 1295 } else { 1296 sslctx.peerValidationMode = TLSPeerValidationMode.none; 1297 } 1298 immutable keyFile = _sslOptions.getKeyFile(); 1299 immutable certFile = _sslOptions.getCertFile(); 1300 final switch(_sslOptions.haveFiles()) { 1301 case 0b11: // both files 1302 sslctx.usePrivateKeyFile(keyFile); 1303 sslctx.useCertificateChainFile(certFile); 1304 break; 1305 case 0b01: // key only 1306 sslctx.usePrivateKeyFile(keyFile); 1307 sslctx.useCertificateChainFile(keyFile); 1308 break; 1309 case 0b10: // cert only 1310 sslctx.usePrivateKeyFile(certFile); 1311 sslctx.useCertificateChainFile(certFile); 1312 break; 1313 case 0b00: 1314 break; 1315 } 1316 _sslStream = createTLSStream(_conn, sslctx, host); 1317 } 1318 1319 public: 1320 this(SSLOptions opts) { 1321 _sslOptions = opts; 1322 } 1323 override TCPConnection so() { 1324 return _conn; 1325 } 1326 this(NetworkStream ostream, SSLOptions opts, string host = null) { 1327 _sslOptions = opts; 1328 auto oconn = ostream.so(); 1329 underlyingConnection = oconn; 1330 _conn = oconn; 1331 connectSSL(host); 1332 } 1333 override NetworkStream connect(string host, ushort port, Duration timeout = 10.seconds) { 1334 try { 1335 _conn = connectTCP(host, port); 1336 connectSSL(host); 1337 } 1338 catch (ConnectError e) { 1339 throw e; 1340 } 1341 catch (Exception e) { 1342 throw new ConnectError("Can't connect to %s:%d".format(host, port), __FILE__, __LINE__, e); 1343 } 1344 1345 return this; 1346 } 1347 1348 override ptrdiff_t send(const(void)[] buff) { 1349 _sslStream.write(cast(const(ubyte)[])buff); 1350 return buff.length; 1351 } 1352 1353 override ptrdiff_t receive(void[] buff) { 1354 if (!_sslStream.dataAvailableForRead) { 1355 if (!_conn.waitForData(_readTimeout)) { 1356 if (!_conn.connected) { 1357 return 0; 1358 } 1359 throw new TimeoutException("Timeout receiving data"); 1360 } 1361 } 1362 1363 if(_sslStream.empty) { 1364 return 0; 1365 } 1366 1367 auto chunk = min(_sslStream.leastSize, buff.length); 1368 assert(chunk != 0); 1369 _sslStream.read(cast(ubyte[])buff[0 .. chunk]); 1370 return chunk; 1371 } 1372 1373 override void close() @trusted { 1374 if ( _sslStream ) 1375 { 1376 _sslStream.finalize(); 1377 } 1378 _conn.close(); 1379 _isOpen = false; 1380 } 1381 @property override bool isOpen() const { 1382 return _conn && _isOpen; 1383 } 1384 override SSLVibeStream accept() { 1385 assert(false, "Must be implemented"); 1386 } 1387 override @property void reuseAddr(bool){ 1388 assert(false, "Not Implemented"); 1389 } 1390 override void bind(Address){ 1391 assert(false, "Not Implemented"); 1392 } 1393 override void listen(int){ 1394 assert(false, "Not Implemented"); 1395 } 1396 } 1397 } 1398 1399 version (vibeD) { 1400 public alias TCPStream = TCPVibeStream; 1401 public alias SSLStream = SSLVibeStream; 1402 } 1403 else { 1404 public alias TCPStream = TCPSocketStream; 1405 public alias SSLStream = SSLSocketStream; 1406 }