1 module requests.ftp; 2 3 private: 4 import std.ascii; 5 import std.algorithm; 6 import std.conv; 7 import std.datetime; 8 import std.format; 9 import std.exception; 10 import std.string; 11 import std.range; 12 import std.experimental.logger; 13 import std.stdio; 14 import std.path; 15 import std.traits; 16 import std.typecons; 17 18 import requests.uri; 19 import requests.utils; 20 import requests.streams; 21 import requests.base; 22 import requests.request; 23 import requests.connmanager; 24 import requests.rangeadapter; 25 26 public class FTPServerResponseError: Exception { 27 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow { 28 super(message, file, line, next); 29 } 30 } 31 32 public class FTPResponse : Response { 33 } 34 35 public class FtpAuthentication: Auth { 36 private { 37 string _username, _password; 38 } 39 /// Constructor. 40 /// Params: 41 /// username = username 42 /// password = password 43 /// 44 this(string username, string password) { 45 _username = username; 46 _password = password; 47 } 48 override string userName() { 49 return _username; 50 } 51 override string password() { 52 return _password; 53 } 54 override string[string] authHeaders(string domain) { 55 return null; 56 } 57 } 58 59 enum defaultBufferSize = 8192; 60 61 public struct FTPRequest { 62 private { 63 URI _uri; 64 Duration _timeout = 60.seconds; 65 uint _verbosity = 0; 66 size_t _bufferSize = defaultBufferSize; 67 long _maxContentLength = 5*1024*1024*1024; 68 long _contentLength = -1; 69 long _contentReceived; 70 NetworkStream _controlChannel; 71 string[] _responseHistory; 72 FTPResponse _response; 73 bool _useStreaming; 74 Auth _authenticator; 75 string _method; 76 string _proxy; 77 string _bind; 78 RefCounted!ConnManager _cm; 79 InputRangeAdapter _postData; 80 } 81 mixin(Getter_Setter!Duration("timeout")); 82 mixin(Getter_Setter!uint("verbosity")); 83 mixin(Getter_Setter!size_t("bufferSize")); 84 mixin(Getter_Setter!long("maxContentLength")); 85 mixin(Getter_Setter!bool("useStreaming")); 86 mixin(Getter("contentLength")); 87 mixin(Getter("contentReceived")); 88 mixin(Setter!Auth("authenticator")); 89 mixin(Getter_Setter!string("proxy")); 90 mixin(Getter_Setter!string("bind")); 91 92 @property final string[] responseHistory() @safe @nogc nothrow { 93 return _responseHistory; 94 } 95 this(string uri) { 96 _uri = URI(uri); 97 } 98 99 this(in URI uri) { 100 _uri = uri; 101 } 102 103 ~this() { 104 //if ( _controlChannel ) { 105 // _controlChannel.close(); 106 //} 107 } 108 string toString() const { 109 return "FTPRequest(%s, %s)".format(_method, _uri.uri()); 110 } 111 string format(string fmt) const { 112 import std.array; 113 import std.stdio; 114 auto a = appender!string(); 115 auto f = FormatSpec!char(fmt); 116 while (f.writeUpToNextSpec(a)) { 117 switch(f.spec) { 118 case 'h': 119 // Remote hostname. 120 a.put(_uri.host); 121 break; 122 case 'm': 123 // method. 124 a.put(_method); 125 break; 126 case 'p': 127 // Remote port. 128 a.put("%d".format(_uri.port)); 129 break; 130 case 'P': 131 // Path 132 a.put(_uri.path); 133 break; 134 case 'q': 135 // query parameters supplied with url. 136 a.put(_uri.query); 137 break; 138 case 'U': 139 a.put(_uri.uri()); 140 break; 141 default: 142 throw new FormatException("Unknown Request format spec " ~ f.spec); 143 } 144 } 145 return a.data(); 146 } 147 ushort sendCmdGetResponse(string cmd, NetworkStream __controlChannel) { 148 debug(requests) tracef("cmd to server: %s", cmd.strip); 149 if ( _verbosity >=1 ) { 150 writefln("> %s", cmd.strip); 151 } 152 __controlChannel.send(cmd); 153 string response = serverResponse(__controlChannel); 154 _responseHistory ~= response; 155 return responseToCode(response); 156 } 157 158 ushort responseToCode(string response) pure const @safe { 159 return to!ushort(response[0..3]); 160 } 161 162 void handleChangeURI(in string uri) @safe { 163 // if control channel exists and new URL not match old, then close 164 URI newURI = URI(uri); 165 if ( _controlChannel && 166 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) { 167 _controlChannel.close(); 168 _controlChannel = null; 169 } 170 _uri = newURI; 171 } 172 173 string serverResponse(NetworkStream __controlChannel) { 174 string res, buffer; 175 immutable bufferLimit = 16*1024; 176 __controlChannel.readTimeout = _timeout; 177 scope(exit) { 178 __controlChannel.readTimeout = 0.seconds; 179 } 180 debug(requests) trace("Wait on control channel"); 181 auto b = new ubyte[1]; 182 while ( __controlChannel && __controlChannel.isConnected && buffer.length < bufferLimit ) { 183 ptrdiff_t rc; 184 try { 185 rc = __controlChannel.receive(b); 186 } 187 catch (Exception e) { 188 error("Failed to read response from server"); 189 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e); 190 } 191 //debug(requests) tracef("Got %d bytes from control socket", rc); 192 if ( rc == 0 ) { 193 error("Failed to read response from server"); 194 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__); 195 } 196 buffer ~= b[0..rc]; 197 if ( buffer.endsWith('\n') ){ 198 if ( _verbosity >= 1 ) { 199 buffer. 200 splitLines. 201 each!(l=>writefln("< %s", l)); 202 } 203 auto responseLines = buffer. 204 splitLines. 205 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit); 206 if ( responseLines.count > 0 ) { 207 return responseLines.front; 208 } 209 } 210 } 211 throw new FTPServerResponseError("Failed to read server responce over control channel"); 212 assert(0); 213 } 214 ushort tryCdOrCreatePath(string[] path) { 215 /* 216 * At start we stay at original path, we have to create next path element 217 * For example: 218 * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok 219 * or try to cteate "a" and cd to "a". 220 */ 221 debug(requests) info("Trying to create path %s".format(path)); 222 enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path)); 223 auto next_dir = path[1]; 224 auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel); 225 if ( code >= 300) { 226 // try to create, then again CWD 227 code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n", _controlChannel); 228 if ( code > 300 ) { 229 return code; 230 } 231 code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel); 232 } 233 if ( path.length == 2 ) { 234 return code; 235 } 236 return tryCdOrCreatePath(path[1..$]); 237 } 238 239 FTPResponse post(R, A...)(string uri, R content, A args) 240 if ( __traits(compiles, cast(ubyte[])content) 241 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 242 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte))) 243 ) 244 { 245 if ( uri ) { 246 handleChangeURI(uri); 247 } 248 _postData = makeAdapter(content); 249 return post(); 250 } 251 252 FTPResponse post() 253 { 254 string response; 255 ushort code; 256 257 _response = new FTPResponse; 258 _response._startedAt = Clock.currTime; 259 _method = "POST"; 260 261 scope(exit) { 262 _response._finishedAt = Clock.currTime; 263 } 264 265 _response.uri = _uri; 266 _response.finalURI = _uri; 267 268 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port); 269 270 if ( !_controlChannel ) { 271 _controlChannel = new TCPStream(); 272 _controlChannel.connect(_uri.host, _uri.port, _timeout); 273 response = serverResponse(_controlChannel); 274 _responseHistory ~= response; 275 276 code = responseToCode(response); 277 debug(requests) tracef("Server initial response: %s", response); 278 if ( code/100 > 2 ) { 279 _response.code = code; 280 return _response; 281 } 282 // Log in 283 string user, pass; 284 if ( _authenticator ) { 285 user = _authenticator.userName(); 286 pass = _authenticator.password(); 287 } 288 else{ 289 user = _uri.username.length ? _uri.username : "anonymous"; 290 pass = _uri.password.length ? _uri.password : "requests@"; 291 } 292 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 293 294 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel); 295 if ( code/100 > 3 ) { 296 _response.code = code; 297 return _response; 298 } else if ( code/100 == 3) { 299 300 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel); 301 if ( code/100 > 2 ) { 302 _response.code = code; 303 return _response; 304 } 305 } 306 307 } 308 code = sendCmdGetResponse("PWD\r\n", _controlChannel); 309 string pwd; 310 if ( code/100 == 2 ) { 311 // like '257 "/home/testuser"' 312 auto a = _responseHistory[$-1].split(); 313 if ( a.length > 1 ) { 314 pwd = a[1].chompPrefix(`"`).chomp(`"`); 315 } 316 } 317 scope (exit) { 318 if ( pwd && _controlChannel ) { 319 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel); 320 } 321 } 322 323 auto path = dirName(_uri.path); 324 if ( path != "/") { 325 path = path.chompPrefix("/"); 326 } 327 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel); 328 if ( code == 550 ) { 329 // try to create directory end enter it 330 code = tryCdOrCreatePath(dirName(_uri.path).split('/')); 331 } 332 if ( code/100 > 2 ) { 333 _response.code = code; 334 return _response; 335 } 336 337 code = sendCmdGetResponse("PASV\r\n", _controlChannel); 338 if ( code/100 > 2 ) { 339 _response.code = code; 340 return _response; 341 } 342 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 343 // in last response. 344 // Cut anything between ( and ) 345 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 346 string host; 347 ushort port; 348 try { 349 ubyte a1,a2,a3,a4,p1,p2; 350 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 351 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4); 352 port = (p1<<8) + p2; 353 } catch (FormatException e) { 354 error("Failed to parse ", v); 355 _response.code = 500; 356 return _response; 357 } 358 359 auto dataStream = new TCPStream(); 360 scope (exit ) { 361 if ( dataStream !is null ) { 362 dataStream.close(); 363 } 364 } 365 366 dataStream.connect(host, port, _timeout); 367 368 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel); 369 if ( code/100 > 2 ) { 370 _response.code = code; 371 return _response; 372 } 373 374 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel); 375 if ( code/100 > 1 ) { 376 _response.code = code; 377 return _response; 378 } 379 size_t uploaded; 380 while ( !_postData.empty ) { 381 auto chunk = _postData.front; 382 uploaded += chunk.length; 383 dataStream.send(chunk); 384 _postData.popFront; 385 } 386 debug(requests) tracef("sent"); 387 dataStream.close(); 388 dataStream = null; 389 response = serverResponse(_controlChannel); 390 code = responseToCode(response); 391 if ( code/100 == 2 ) { 392 debug(requests) tracef("Successfully uploaded %d bytes", uploaded); 393 } 394 _response.code = code; 395 return _response; 396 } 397 private auto connectData(string v) 398 { 399 string host; 400 ushort port; 401 402 ubyte a1,a2,a3,a4,p1,p2; 403 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2); 404 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4); 405 port = (p1<<8) + p2; 406 407 auto dataStream = new TCPStream(); 408 dataStream.bind(_bind); 409 dataStream.connect(host, port, _timeout); 410 return dataStream; 411 } 412 413 FTPResponse get(string uri = null) { 414 enforce( uri || _uri.host, "FTP URL undefined"); 415 string response; 416 ushort code; 417 418 _response = new FTPResponse; 419 _contentReceived = 0; 420 _method = "GET"; 421 422 _response._startedAt = Clock.currTime; 423 scope(exit) { 424 _response._finishedAt = Clock.currTime; 425 } 426 427 if ( uri ) { 428 handleChangeURI(uri); 429 } 430 431 _response.uri = _uri; 432 _response.finalURI = _uri; 433 434 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port); 435 436 if ( !_controlChannel ) { 437 _controlChannel = new TCPStream(); 438 _controlChannel.bind(_bind); 439 _controlChannel.connect(_uri.host, _uri.port, _timeout); 440 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) ) 441 { 442 debug(requests) tracef("closing purged connection %s", purged_connection); 443 purged_connection.close(); 444 } 445 _response._connectedAt = Clock.currTime; 446 response = serverResponse(_controlChannel); 447 _responseHistory ~= response; 448 449 code = responseToCode(response); 450 debug(requests) tracef("Server initial response: %s", response); 451 if ( code/100 > 2 ) { 452 _response.code = code; 453 return _response; 454 } 455 // Log in 456 string user, pass; 457 if ( _authenticator ) { 458 user = _authenticator.userName(); 459 pass = _authenticator.password(); 460 } 461 else{ 462 user = _uri.username.length ? _uri.username : "anonymous"; 463 pass = _uri.password.length ? _uri.password : "requests@"; 464 } 465 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 466 467 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel); 468 if ( code/100 > 3 ) { 469 _response.code = code; 470 return _response; 471 } else if ( code/100 == 3) { 472 473 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel); 474 if ( code/100 > 2 ) { 475 _response.code = code; 476 return _response; 477 } 478 } 479 } 480 else { 481 _response._connectedAt = Clock.currTime; 482 } 483 484 code = sendCmdGetResponse("PWD\r\n", _controlChannel); 485 string pwd; 486 if ( code/100 == 2 ) { 487 // like '257 "/home/testuser"' 488 auto a = _responseHistory[$-1].split(); 489 if ( a.length > 1 ) { 490 pwd = a[1].chompPrefix(`"`).chomp(`"`); 491 } 492 } 493 scope (exit) { 494 if ( pwd && _controlChannel && !_useStreaming ) { 495 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel); 496 } 497 } 498 499 auto path = dirName(_uri.path); 500 if ( path != "/") { 501 path = path.chompPrefix("/"); 502 } 503 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel); 504 if ( code/100 > 2 ) { 505 _response.code = code; 506 return _response; 507 } 508 509 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel); 510 if ( code/100 > 2 ) { 511 _response.code = code; 512 return _response; 513 } 514 515 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel); 516 if ( code/100 == 2 ) { 517 // something like 518 // 213 229355520 519 auto s = _responseHistory[$-1].findSplitAfter(" "); 520 if ( s.length ) { 521 try { 522 _contentLength = to!long(s[1]); 523 } catch (ConvException) { 524 debug(requests) trace("Failed to convert string %s to file size".format(s[1])); 525 } 526 } 527 } 528 529 if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) { 530 throw new RequestException("maxContentLength exceeded for ftp data"); 531 } 532 533 code = sendCmdGetResponse("PASV\r\n", _controlChannel); 534 if ( code/100 > 2 ) { 535 _response.code = code; 536 return _response; 537 } 538 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 539 // in last response. 540 // Cut anything between ( and ) 541 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 542 543 TCPStream dataStream; 544 try{ 545 dataStream = connectData(v); 546 } catch (FormatException e) { 547 error("Failed to parse ", v); 548 _response.code = 500; 549 return _response; 550 } 551 scope (exit ) { 552 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 553 dataStream.close(); 554 } 555 } 556 557 _response._requestSentAt = Clock.currTime; 558 559 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel); 560 if ( code/100 > 1 && code/100 < 5) { 561 _response.code = code; 562 return _response; 563 } 564 if ( code/100 == 5) { 565 dataStream.close(); 566 code = sendCmdGetResponse("PASV\r\n", _controlChannel); 567 if ( code/100 > 2 ) { 568 _response.code = code; 569 return _response; 570 } 571 v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 572 dataStream = connectData(v); 573 code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel); 574 if ( code/100 > 1 ) { 575 _response.code = code; 576 return _response; 577 } 578 } 579 580 dataStream.readTimeout = _timeout; 581 while ( true ) { 582 auto b = new ubyte[_bufferSize]; 583 auto rc = dataStream.receive(b); 584 if ( rc <= 0 ) { 585 debug(requests) trace("done"); 586 break; 587 } 588 debug(requests) tracef("got %d bytes from data channel", rc); 589 590 _contentReceived += rc; 591 _response._responseBody.putNoCopy(b[0..rc]); 592 593 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 594 throw new RequestException("maxContentLength exceeded for ftp data"); 595 } 596 if ( _useStreaming ) { 597 debug(requests) trace("ftp uses streaming"); 598 599 auto __maxContentLength = _maxContentLength; 600 auto __contentLength = _contentLength; 601 auto __contentReceived = _contentReceived; 602 auto __bufferSize = _bufferSize; 603 auto __dataStream = dataStream; 604 auto __controlChannel = _controlChannel; 605 606 _response._contentLength = _contentLength; 607 _response.receiveAsRange.activated = true; 608 _response.receiveAsRange.data.length = 0; 609 _response.receiveAsRange.data = _response._responseBody.data; 610 _response.receiveAsRange.read = delegate ubyte[] () { 611 Buffer!ubyte result; 612 while(true) { 613 // check if we received everything we need 614 if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength ) 615 { 616 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 617 format(__contentLength, __maxContentLength)); 618 } 619 // have to continue 620 auto b = new ubyte[__bufferSize]; 621 ptrdiff_t read; 622 try { 623 read = __dataStream.receive(b); 624 } 625 catch (Exception e) { 626 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 627 } 628 629 if ( read > 0 ) { 630 _response._contentReceived += read; 631 __contentReceived += read; 632 result.putNoCopy(b[0..read]); 633 return result.data; 634 } 635 if ( read == 0 ) { 636 debug(requests) tracef("streaming_in: server closed connection"); 637 __dataStream.close(); 638 code = responseToCode(serverResponse(__controlChannel)); 639 if ( code/100 == 2 ) { 640 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 641 } 642 _response.code = code; 643 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel); 644 break; 645 } 646 } 647 return result.data; 648 }; 649 debug(requests) tracef("leave streaming get"); 650 return _response; 651 } 652 } 653 dataStream.close(); 654 response = serverResponse(_controlChannel); 655 code = responseToCode(response); 656 if ( code/100 == 2 ) { 657 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 658 } 659 _response.code = code; 660 return _response; 661 } 662 FTPResponse list(string uri = null) { 663 enforce( uri || _uri.host, "FTP URL undefined"); 664 string response; 665 ushort code; 666 667 _response = new FTPResponse; 668 _contentReceived = 0; 669 _method = "GET"; 670 671 _response._startedAt = Clock.currTime; 672 scope(exit) { 673 _response._finishedAt = Clock.currTime; 674 } 675 676 if ( uri ) { 677 handleChangeURI(uri); 678 } 679 680 _response.uri = _uri; 681 _response.finalURI = _uri; 682 683 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port); 684 685 if ( !_controlChannel ) { 686 _controlChannel = new TCPStream(); 687 _controlChannel.bind(_bind); 688 _controlChannel.connect(_uri.host, _uri.port, _timeout); 689 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) ) 690 { 691 debug(requests) tracef("closing purged connection %s", purged_connection); 692 purged_connection.close(); 693 } 694 _response._connectedAt = Clock.currTime; 695 response = serverResponse(_controlChannel); 696 _responseHistory ~= response; 697 698 code = responseToCode(response); 699 debug(requests) tracef("Server initial response: %s", response); 700 if ( code/100 > 2 ) { 701 _response.code = code; 702 return _response; 703 } 704 // Log in 705 string user, pass; 706 if ( _authenticator ) { 707 user = _authenticator.userName(); 708 pass = _authenticator.password(); 709 } 710 else{ 711 user = _uri.username.length ? _uri.username : "anonymous"; 712 pass = _uri.password.length ? _uri.password : "requests@"; 713 } 714 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1)); 715 716 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel); 717 if ( code/100 > 3 ) { 718 _response.code = code; 719 return _response; 720 } else if ( code/100 == 3) { 721 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel); 722 if ( code/100 > 2 ) { 723 _response.code = code; 724 return _response; 725 } 726 } 727 } 728 else { 729 _response._connectedAt = Clock.currTime; 730 } 731 732 code = sendCmdGetResponse("PWD\r\n", _controlChannel); 733 string pwd; 734 if ( code/100 == 2 ) { 735 // like '257 "/home/testuser"' 736 auto a = _responseHistory[$-1].split(); 737 if ( a.length > 1 ) { 738 pwd = a[1].chompPrefix(`"`).chomp(`"`); 739 } 740 } 741 scope (exit) { 742 if ( pwd && _controlChannel && !_useStreaming ) { 743 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel); 744 } 745 } 746 747 auto path = dirName(_uri.path); 748 if ( path != "/") { 749 path = path.chompPrefix("/"); 750 } 751 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel); 752 if ( code/100 > 2 ) { 753 _response.code = code; 754 return _response; 755 } 756 757 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel); 758 if ( code/100 > 2 ) { 759 _response.code = code; 760 return _response; 761 } 762 763 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel); 764 if ( code/100 == 2 ) { 765 // something like 766 // 213 229355520 767 auto s = _responseHistory[$-1].findSplitAfter(" "); 768 if ( s.length ) { 769 try { 770 _contentLength = to!long(s[1]); 771 } catch (ConvException) { 772 debug(requests) trace("Failed to convert string %s to file size".format(s[1])); 773 } 774 } 775 } 776 777 if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) { 778 throw new RequestException("maxContentLength exceeded for ftp data"); 779 } 780 781 code = sendCmdGetResponse("PASV\r\n", _controlChannel); 782 if ( code/100 > 2 ) { 783 _response.code = code; 784 return _response; 785 } 786 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected 787 // in last response. 788 // Cut anything between ( and ) 789 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 790 791 TCPStream dataStream; 792 try{ 793 dataStream = connectData(v); 794 } catch (FormatException e) { 795 error("Failed to parse ", v); 796 _response.code = 500; 797 return _response; 798 } 799 scope (exit ) { 800 if ( dataStream !is null && !_response._receiveAsRange.activated ) { 801 dataStream.close(); 802 } 803 } 804 805 _response._requestSentAt = Clock.currTime; 806 807 code = sendCmdGetResponse("LIST " ~ baseName(_uri.path) ~ "\r\n", _controlChannel); 808 if ( code/100 > 1 && code/100 < 5) { 809 _response.code = code; 810 return _response; 811 } 812 if ( code/100 == 5) { 813 dataStream.close(); 814 code = sendCmdGetResponse("PASV\r\n", _controlChannel); 815 if ( code/100 > 2 ) { 816 _response.code = code; 817 return _response; 818 } 819 v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1]; 820 dataStream = connectData(v); 821 code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel); 822 if ( code/100 > 1 ) { 823 _response.code = code; 824 return _response; 825 } 826 } 827 828 dataStream.readTimeout = _timeout; 829 while ( true ) { 830 auto b = new ubyte[_bufferSize]; 831 auto rc = dataStream.receive(b); 832 if ( rc <= 0 ) { 833 debug(requests) trace("done"); 834 break; 835 } 836 debug(requests) tracef("got %d bytes from data channel", rc); 837 838 _contentReceived += rc; 839 _response._responseBody.putNoCopy(b[0..rc]); 840 841 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) { 842 throw new RequestException("maxContentLength exceeded for ftp data"); 843 } 844 if ( _useStreaming ) { 845 debug(requests) trace("ftp uses streaming"); 846 847 auto __maxContentLength = _maxContentLength; 848 auto __contentLength = _contentLength; 849 auto __contentReceived = _contentReceived; 850 auto __bufferSize = _bufferSize; 851 auto __dataStream = dataStream; 852 auto __controlChannel = _controlChannel; 853 854 _response._contentLength = _contentLength; 855 _response.receiveAsRange.activated = true; 856 _response.receiveAsRange.data.length = 0; 857 _response.receiveAsRange.data = _response._responseBody.data; 858 _response.receiveAsRange.read = delegate ubyte[] () { 859 Buffer!ubyte result; 860 while(true) { 861 // check if we received everything we need 862 if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength ) 863 { 864 throw new RequestException("ContentLength > maxContentLength (%d>%d)". 865 format(__contentLength, __maxContentLength)); 866 } 867 // have to continue 868 auto b = new ubyte[__bufferSize]; 869 ptrdiff_t read; 870 try { 871 read = __dataStream.receive(b); 872 } 873 catch (Exception e) { 874 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e); 875 } 876 877 if ( read > 0 ) { 878 _response._contentReceived += read; 879 __contentReceived += read; 880 result.putNoCopy(b[0..read]); 881 return result.data; 882 } 883 if ( read == 0 ) { 884 debug(requests) tracef("streaming_in: server closed connection"); 885 __dataStream.close(); 886 code = responseToCode(serverResponse(__controlChannel)); 887 if ( code/100 == 2 ) { 888 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 889 } 890 _response.code = code; 891 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel); 892 break; 893 } 894 } 895 return result.data; 896 }; 897 debug(requests) tracef("leave streaming get"); 898 return _response; 899 } 900 } 901 dataStream.close(); 902 response = serverResponse(_controlChannel); 903 code = responseToCode(response); 904 if ( code/100 == 2 ) { 905 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length); 906 } 907 _response.code = code; 908 return _response; 909 } 910 FTPResponse execute(Request r) 911 { 912 string method = r.method; 913 _uri = r.uri(); 914 _authenticator = r.authenticator; 915 _maxContentLength = r.maxContentLength; 916 _useStreaming = r.useStreaming; 917 _verbosity = r.verbosity; 918 _cm = r.cm; 919 _postData = r.postData; 920 _bufferSize = r.bufferSize; 921 _proxy = r.proxy; 922 _bind = r.bind; 923 _timeout = r.timeout; 924 925 if ( method == "GET" ) 926 { 927 return get(); 928 } 929 if ( method == "POST" ) 930 { 931 return post(); 932 } 933 if ( method == "LIST" ) { 934 return list(); 935 } 936 assert(0, "Can't handle method %s for ftp request".format(method)); 937 } 938 } 939 940 //package unittest { 941 // import std.process; 942 // 943 // globalLogLevel(LogLevel.info); 944 // bool unreliable_network = environment.get("UNRELIABLENETWORK", "false") == "true"; 945 // 946 // info("testing ftp"); 947 // auto rq = FTPRequest(); 948 // info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 949 // auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation); 950 // assert(unreliable_network || rs.code == 226); 951 // info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session."); 952 // rs = rq.get("ftp://speedtest.tele2.net/nonexistent"); 953 // assert(unreliable_network || rs.code != 226); 954 // info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session."); 955 // rs = rq.get("ftp://speedtest.tele2.net/1KB.zip"); 956 // assert(unreliable_network || rs.code == 226); 957 // assert(unreliable_network || rs.responseBody.length == 1024); 958 // info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT"); 959 // rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation); 960 // assert(unreliable_network || rs.code == 226); 961 // info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 962 // try { 963 // rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 964 // } 965 // catch (ConnectError e) 966 // { 967 // } 968 // assert(unreliable_network || rs.code == 226); 969 // info("ftp get ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator"); 970 // rq.authenticator = new FtpAuthentication("anonymous", "requests@"); 971 // try { 972 // rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 973 // } 974 // catch (ConnectError e) 975 // { 976 // } 977 // assert(unreliable_network || rs.code == 226); 978 // assert(unreliable_network || rs.finalURI.path == "/pub/FreeBSD/README.TXT"); 979 // assert(unreliable_network || rq.format("%m|%h|%p|%P|%q|%U") == "GET|ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 980 // assert(unreliable_network || rs.format("%h|%p|%P|%q|%U") == "ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT"); 981 // info("testing ftp - done."); 982 //}