1 module requests.ftp;
2
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.exception;
10 import std.string;
11 import std.range;
12 import std.experimental.logger;
13 import std.stdio;
14 import std.path;
15 import std.traits;
16 import std.typecons;
17
18 import requests.uri;
19 import requests.utils;
20 import requests.streams;
21 import requests.base;
22 import requests.request;
23 import requests.connmanager;
24 import requests.rangeadapter;
25
26 public class FTPServerResponseError: Exception {
27 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
28 super(message, file, line, next);
29 }
30 }
31
32 public class FTPResponse : Response {
33 }
34
35 public class FtpAuthentication: Auth {
36 private {
37 string _username, _password;
38 }
39 /// Constructor.
40 /// Params:
41 /// username = username
42 /// password = password
43 ///
44 this(string username, string password) {
45 _username = username;
46 _password = password;
47 }
48 override string userName() {
49 return _username;
50 }
51 override string password() {
52 return _password;
53 }
54 override string[string] authHeaders(string domain) {
55 return null;
56 }
57 }
58
59 enum defaultBufferSize = 8192;
60
61 public struct FTPRequest {
62 private {
63 URI _uri;
64 Duration _timeout = 60.seconds;
65 uint _verbosity = 0;
66 size_t _bufferSize = defaultBufferSize;
67 long _maxContentLength = 5*1024*1024*1024;
68 long _contentLength = -1;
69 long _contentReceived;
70 NetworkStream _controlChannel;
71 string[] _responseHistory;
72 FTPResponse _response;
73 bool _useStreaming;
74 Auth _authenticator;
75 string _method;
76 string _proxy;
77 string _bind;
78 RefCounted!ConnManager _cm;
79 InputRangeAdapter _postData;
80 }
81 mixin(Getter_Setter!Duration("timeout"));
82 mixin(Getter_Setter!uint("verbosity"));
83 mixin(Getter_Setter!size_t("bufferSize"));
84 mixin(Getter_Setter!long("maxContentLength"));
85 mixin(Getter_Setter!bool("useStreaming"));
86 mixin(Getter("contentLength"));
87 mixin(Getter("contentReceived"));
88 mixin(Setter!Auth("authenticator"));
89 mixin(Getter_Setter!string("proxy"));
90 mixin(Getter_Setter!string("bind"));
91
92 @property final string[] responseHistory() @safe @nogc nothrow {
93 return _responseHistory;
94 }
95 this(string uri) {
96 _uri = URI(uri);
97 }
98
99 this(in URI uri) {
100 _uri = uri;
101 }
102
103 ~this() {
104 //if ( _controlChannel ) {
105 // _controlChannel.close();
106 //}
107 }
108 string toString() const {
109 return "FTPRequest(%s, %s)".format(_method, _uri.uri());
110 }
111 string format(string fmt) const {
112 import std.array;
113 import std.stdio;
114 auto a = appender!string();
115 auto f = FormatSpec!char(fmt);
116 while (f.writeUpToNextSpec(a)) {
117 switch(f.spec) {
118 case 'h':
119 // Remote hostname.
120 a.put(_uri.host);
121 break;
122 case 'm':
123 // method.
124 a.put(_method);
125 break;
126 case 'p':
127 // Remote port.
128 a.put("%d".format(_uri.port));
129 break;
130 case 'P':
131 // Path
132 a.put(_uri.path);
133 break;
134 case 'q':
135 // query parameters supplied with url.
136 a.put(_uri.query);
137 break;
138 case 'U':
139 a.put(_uri.uri());
140 break;
141 default:
142 throw new FormatException("Unknown Request format spec " ~ f.spec);
143 }
144 }
145 return a.data();
146 }
147 ushort sendCmdGetResponse(string cmd, NetworkStream __controlChannel) {
148 debug(requests) tracef("cmd to server: %s", cmd.strip);
149 if ( _verbosity >=1 ) {
150 writefln("> %s", cmd.strip);
151 }
152 __controlChannel.send(cmd);
153 string response = serverResponse(__controlChannel);
154 _responseHistory ~= response;
155 return responseToCode(response);
156 }
157
158 ushort responseToCode(string response) pure const @safe {
159 return to!ushort(response[0..3]);
160 }
161
162 void handleChangeURI(in string uri) @safe {
163 // if control channel exists and new URL not match old, then close
164 URI newURI = URI(uri);
165 if ( _controlChannel &&
166 (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
167 _controlChannel.close();
168 _controlChannel = null;
169 }
170 _uri = newURI;
171 }
172
173 string serverResponse(NetworkStream __controlChannel) {
174 string res, buffer;
175 immutable bufferLimit = 16*1024;
176 __controlChannel.readTimeout = _timeout;
177 scope(exit) {
178 __controlChannel.readTimeout = 0.seconds;
179 }
180 debug(requests) trace("Wait on control channel");
181 auto b = new ubyte[1];
182 while ( __controlChannel && __controlChannel.isConnected && buffer.length < bufferLimit ) {
183 ptrdiff_t rc;
184 try {
185 rc = __controlChannel.receive(b);
186 }
187 catch (Exception e) {
188 error("Failed to read response from server");
189 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e);
190 }
191 //debug(requests) tracef("Got %d bytes from control socket", rc);
192 if ( rc == 0 ) {
193 error("Failed to read response from server");
194 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__);
195 }
196 buffer ~= b[0..rc];
197 if ( buffer.endsWith('\n') ){
198 if ( _verbosity >= 1 ) {
199 buffer.
200 splitLines.
201 each!(l=>writefln("< %s", l));
202 }
203 auto responseLines = buffer.
204 splitLines.
205 filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
206 if ( responseLines.count > 0 ) {
207 return responseLines.front;
208 }
209 }
210 }
211 throw new FTPServerResponseError("Failed to read server responce over control channel");
212 assert(0);
213 }
214 ushort tryCdOrCreatePath(string[] path) {
215 /*
216 * At start we stay at original path, we have to create next path element
217 * For example:
218 * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok
219 * or try to cteate "a" and cd to "a".
220 */
221 debug(requests) info("Trying to create path %s".format(path));
222 enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path));
223 auto next_dir = path[1];
224 auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel);
225 if ( code >= 300) {
226 // try to create, then again CWD
227 code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n", _controlChannel);
228 if ( code > 300 ) {
229 return code;
230 }
231 code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel);
232 }
233 if ( path.length == 2 ) {
234 return code;
235 }
236 return tryCdOrCreatePath(path[1..$]);
237 }
238
239 FTPResponse post(R, A...)(string uri, R content, A args)
240 if ( __traits(compiles, cast(ubyte[])content)
241 || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front))))
242 || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte)))
243 )
244 {
245 if ( uri ) {
246 handleChangeURI(uri);
247 }
248 _postData = makeAdapter(content);
249 return post();
250 }
251
252 FTPResponse post()
253 {
254 string response;
255 ushort code;
256
257 _response = new FTPResponse;
258 _response._startedAt = Clock.currTime;
259 _method = "POST";
260
261 scope(exit) {
262 _response._finishedAt = Clock.currTime;
263 }
264
265 _response.uri = _uri;
266 _response.finalURI = _uri;
267
268 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
269
270 if ( !_controlChannel ) {
271 _controlChannel = new TCPStream();
272 _controlChannel.connect(_uri.host, _uri.port, _timeout);
273 response = serverResponse(_controlChannel);
274 _responseHistory ~= response;
275
276 code = responseToCode(response);
277 debug(requests) tracef("Server initial response: %s", response);
278 if ( code/100 > 2 ) {
279 _response.code = code;
280 return _response;
281 }
282 // Log in
283 string user, pass;
284 if ( _authenticator ) {
285 user = _authenticator.userName();
286 pass = _authenticator.password();
287 }
288 else{
289 user = _uri.username.length ? _uri.username : "anonymous";
290 pass = _uri.password.length ? _uri.password : "requests@";
291 }
292 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
293
294 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
295 if ( code/100 > 3 ) {
296 _response.code = code;
297 return _response;
298 } else if ( code/100 == 3) {
299
300 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
301 if ( code/100 > 2 ) {
302 _response.code = code;
303 return _response;
304 }
305 }
306
307 }
308 code = sendCmdGetResponse("PWD\r\n", _controlChannel);
309 string pwd;
310 if ( code/100 == 2 ) {
311 // like '257 "/home/testuser"'
312 auto a = _responseHistory[$-1].split();
313 if ( a.length > 1 ) {
314 pwd = a[1].chompPrefix(`"`).chomp(`"`);
315 }
316 }
317 scope (exit) {
318 if ( pwd && _controlChannel ) {
319 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
320 }
321 }
322
323 auto path = dirName(_uri.path);
324 if ( path != "/") {
325 path = path.chompPrefix("/");
326 }
327 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
328 if ( code == 550 ) {
329 // try to create directory end enter it
330 code = tryCdOrCreatePath(dirName(_uri.path).split('/'));
331 }
332 if ( code/100 > 2 ) {
333 _response.code = code;
334 return _response;
335 }
336
337 code = sendCmdGetResponse("PASV\r\n", _controlChannel);
338 if ( code/100 > 2 ) {
339 _response.code = code;
340 return _response;
341 }
342 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected
343 // in last response.
344 // Cut anything between ( and )
345 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
346 string host;
347 ushort port;
348 try {
349 ubyte a1,a2,a3,a4,p1,p2;
350 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
351 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
352 port = (p1<<8) + p2;
353 } catch (FormatException e) {
354 error("Failed to parse ", v);
355 _response.code = 500;
356 return _response;
357 }
358
359 auto dataStream = new TCPStream();
360 scope (exit ) {
361 if ( dataStream !is null ) {
362 dataStream.close();
363 }
364 }
365
366 dataStream.connect(host, port, _timeout);
367
368 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
369 if ( code/100 > 2 ) {
370 _response.code = code;
371 return _response;
372 }
373
374 code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
375 if ( code/100 > 1 ) {
376 _response.code = code;
377 return _response;
378 }
379 size_t uploaded;
380 while ( !_postData.empty ) {
381 auto chunk = _postData.front;
382 uploaded += chunk.length;
383 dataStream.send(chunk);
384 _postData.popFront;
385 }
386 debug(requests) tracef("sent");
387 dataStream.close();
388 dataStream = null;
389 response = serverResponse(_controlChannel);
390 code = responseToCode(response);
391 if ( code/100 == 2 ) {
392 debug(requests) tracef("Successfully uploaded %d bytes", uploaded);
393 }
394 _response.code = code;
395 return _response;
396 }
397 private auto connectData(string v)
398 {
399 string host;
400 ushort port;
401
402 ubyte a1,a2,a3,a4,p1,p2;
403 formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
404 host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
405 port = (p1<<8) + p2;
406
407 auto dataStream = new TCPStream();
408 dataStream.bind(_bind);
409 dataStream.connect(host, port, _timeout);
410 return dataStream;
411 }
412
413 FTPResponse get(string uri = null) {
414 enforce( uri || _uri.host, "FTP URL undefined");
415 string response;
416 ushort code;
417
418 _response = new FTPResponse;
419 _contentReceived = 0;
420 _method = "GET";
421
422 _response._startedAt = Clock.currTime;
423 scope(exit) {
424 _response._finishedAt = Clock.currTime;
425 }
426
427 if ( uri ) {
428 handleChangeURI(uri);
429 }
430
431 _response.uri = _uri;
432 _response.finalURI = _uri;
433
434 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
435
436 if ( !_controlChannel ) {
437 _controlChannel = new TCPStream();
438 _controlChannel.bind(_bind);
439 _controlChannel.connect(_uri.host, _uri.port, _timeout);
440 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) )
441 {
442 debug(requests) tracef("closing purged connection %s", purged_connection);
443 purged_connection.close();
444 }
445 _response._connectedAt = Clock.currTime;
446 response = serverResponse(_controlChannel);
447 _responseHistory ~= response;
448
449 code = responseToCode(response);
450 debug(requests) tracef("Server initial response: %s", response);
451 if ( code/100 > 2 ) {
452 _response.code = code;
453 return _response;
454 }
455 // Log in
456 string user, pass;
457 if ( _authenticator ) {
458 user = _authenticator.userName();
459 pass = _authenticator.password();
460 }
461 else{
462 user = _uri.username.length ? _uri.username : "anonymous";
463 pass = _uri.password.length ? _uri.password : "requests@";
464 }
465 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
466
467 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
468 if ( code/100 > 3 ) {
469 _response.code = code;
470 return _response;
471 } else if ( code/100 == 3) {
472
473 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
474 if ( code/100 > 2 ) {
475 _response.code = code;
476 return _response;
477 }
478 }
479 }
480 else {
481 _response._connectedAt = Clock.currTime;
482 }
483
484 code = sendCmdGetResponse("PWD\r\n", _controlChannel);
485 string pwd;
486 if ( code/100 == 2 ) {
487 // like '257 "/home/testuser"'
488 auto a = _responseHistory[$-1].split();
489 if ( a.length > 1 ) {
490 pwd = a[1].chompPrefix(`"`).chomp(`"`);
491 }
492 }
493 scope (exit) {
494 if ( pwd && _controlChannel && !_useStreaming ) {
495 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
496 }
497 }
498
499 auto path = dirName(_uri.path);
500 if ( path != "/") {
501 path = path.chompPrefix("/");
502 }
503 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
504 if ( code/100 > 2 ) {
505 _response.code = code;
506 return _response;
507 }
508
509 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
510 if ( code/100 > 2 ) {
511 _response.code = code;
512 return _response;
513 }
514
515 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
516 if ( code/100 == 2 ) {
517 // something like
518 // 213 229355520
519 auto s = _responseHistory[$-1].findSplitAfter(" ");
520 if ( s.length ) {
521 try {
522 _contentLength = to!long(s[1]);
523 } catch (ConvException) {
524 debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
525 }
526 }
527 }
528
529 if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) {
530 throw new RequestException("maxContentLength exceeded for ftp data");
531 }
532
533 code = sendCmdGetResponse("PASV\r\n", _controlChannel);
534 if ( code/100 > 2 ) {
535 _response.code = code;
536 return _response;
537 }
538 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected
539 // in last response.
540 // Cut anything between ( and )
541 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
542
543 TCPStream dataStream;
544 try{
545 dataStream = connectData(v);
546 } catch (FormatException e) {
547 error("Failed to parse ", v);
548 _response.code = 500;
549 return _response;
550 }
551 scope (exit ) {
552 if ( dataStream !is null && !_response._receiveAsRange.activated ) {
553 dataStream.close();
554 }
555 }
556
557 _response._requestSentAt = Clock.currTime;
558
559 code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
560 if ( code/100 > 1 && code/100 < 5) {
561 _response.code = code;
562 return _response;
563 }
564 if ( code/100 == 5) {
565 dataStream.close();
566 code = sendCmdGetResponse("PASV\r\n", _controlChannel);
567 if ( code/100 > 2 ) {
568 _response.code = code;
569 return _response;
570 }
571 v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
572 dataStream = connectData(v);
573 code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel);
574 if ( code/100 > 1 ) {
575 _response.code = code;
576 return _response;
577 }
578 }
579
580 dataStream.readTimeout = _timeout;
581 while ( true ) {
582 auto b = new ubyte[_bufferSize];
583 auto rc = dataStream.receive(b);
584 if ( rc <= 0 ) {
585 debug(requests) trace("done");
586 break;
587 }
588 debug(requests) tracef("got %d bytes from data channel", rc);
589
590 _contentReceived += rc;
591 _response._responseBody.putNoCopy(b[0..rc]);
592
593 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
594 throw new RequestException("maxContentLength exceeded for ftp data");
595 }
596 if ( _useStreaming ) {
597 debug(requests) trace("ftp uses streaming");
598
599 auto __maxContentLength = _maxContentLength;
600 auto __contentLength = _contentLength;
601 auto __contentReceived = _contentReceived;
602 auto __bufferSize = _bufferSize;
603 auto __dataStream = dataStream;
604 auto __controlChannel = _controlChannel;
605
606 _response._contentLength = _contentLength;
607 _response.receiveAsRange.activated = true;
608 _response.receiveAsRange.data.length = 0;
609 _response.receiveAsRange.data = _response._responseBody.data;
610 _response.receiveAsRange.read = delegate ubyte[] () {
611 Buffer!ubyte result;
612 while(true) {
613 // check if we received everything we need
614 if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength )
615 {
616 throw new RequestException("ContentLength > maxContentLength (%d>%d)".
617 format(__contentLength, __maxContentLength));
618 }
619 // have to continue
620 auto b = new ubyte[__bufferSize];
621 ptrdiff_t read;
622 try {
623 read = __dataStream.receive(b);
624 }
625 catch (Exception e) {
626 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
627 }
628
629 if ( read > 0 ) {
630 _response._contentReceived += read;
631 __contentReceived += read;
632 result.putNoCopy(b[0..read]);
633 return result.data;
634 }
635 if ( read == 0 ) {
636 debug(requests) tracef("streaming_in: server closed connection");
637 __dataStream.close();
638 code = responseToCode(serverResponse(__controlChannel));
639 if ( code/100 == 2 ) {
640 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
641 }
642 _response.code = code;
643 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel);
644 break;
645 }
646 }
647 return result.data;
648 };
649 debug(requests) tracef("leave streaming get");
650 return _response;
651 }
652 }
653 dataStream.close();
654 response = serverResponse(_controlChannel);
655 code = responseToCode(response);
656 if ( code/100 == 2 ) {
657 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
658 }
659 _response.code = code;
660 return _response;
661 }
662 FTPResponse list(string uri = null) {
663 enforce( uri || _uri.host, "FTP URL undefined");
664 string response;
665 ushort code;
666
667 _response = new FTPResponse;
668 _contentReceived = 0;
669 _method = "GET";
670
671 _response._startedAt = Clock.currTime;
672 scope(exit) {
673 _response._finishedAt = Clock.currTime;
674 }
675
676 if ( uri ) {
677 handleChangeURI(uri);
678 }
679
680 _response.uri = _uri;
681 _response.finalURI = _uri;
682
683 _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
684
685 if ( !_controlChannel ) {
686 _controlChannel = new TCPStream();
687 _controlChannel.bind(_bind);
688 _controlChannel.connect(_uri.host, _uri.port, _timeout);
689 if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) )
690 {
691 debug(requests) tracef("closing purged connection %s", purged_connection);
692 purged_connection.close();
693 }
694 _response._connectedAt = Clock.currTime;
695 response = serverResponse(_controlChannel);
696 _responseHistory ~= response;
697
698 code = responseToCode(response);
699 debug(requests) tracef("Server initial response: %s", response);
700 if ( code/100 > 2 ) {
701 _response.code = code;
702 return _response;
703 }
704 // Log in
705 string user, pass;
706 if ( _authenticator ) {
707 user = _authenticator.userName();
708 pass = _authenticator.password();
709 }
710 else{
711 user = _uri.username.length ? _uri.username : "anonymous";
712 pass = _uri.password.length ? _uri.password : "requests@";
713 }
714 debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
715
716 code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
717 if ( code/100 > 3 ) {
718 _response.code = code;
719 return _response;
720 } else if ( code/100 == 3) {
721 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
722 if ( code/100 > 2 ) {
723 _response.code = code;
724 return _response;
725 }
726 }
727 }
728 else {
729 _response._connectedAt = Clock.currTime;
730 }
731
732 code = sendCmdGetResponse("PWD\r\n", _controlChannel);
733 string pwd;
734 if ( code/100 == 2 ) {
735 // like '257 "/home/testuser"'
736 auto a = _responseHistory[$-1].split();
737 if ( a.length > 1 ) {
738 pwd = a[1].chompPrefix(`"`).chomp(`"`);
739 }
740 }
741 scope (exit) {
742 if ( pwd && _controlChannel && !_useStreaming ) {
743 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
744 }
745 }
746
747 auto path = dirName(_uri.path);
748 if ( path != "/") {
749 path = path.chompPrefix("/");
750 }
751 code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
752 if ( code/100 > 2 ) {
753 _response.code = code;
754 return _response;
755 }
756
757 code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
758 if ( code/100 > 2 ) {
759 _response.code = code;
760 return _response;
761 }
762
763 code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
764 if ( code/100 == 2 ) {
765 // something like
766 // 213 229355520
767 auto s = _responseHistory[$-1].findSplitAfter(" ");
768 if ( s.length ) {
769 try {
770 _contentLength = to!long(s[1]);
771 } catch (ConvException) {
772 debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
773 }
774 }
775 }
776
777 if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) {
778 throw new RequestException("maxContentLength exceeded for ftp data");
779 }
780
781 code = sendCmdGetResponse("PASV\r\n", _controlChannel);
782 if ( code/100 > 2 ) {
783 _response.code = code;
784 return _response;
785 }
786 // something like "227 Entering Passive Mode (132,180,15,2,210,187)" expected
787 // in last response.
788 // Cut anything between ( and )
789 auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
790
791 TCPStream dataStream;
792 try{
793 dataStream = connectData(v);
794 } catch (FormatException e) {
795 error("Failed to parse ", v);
796 _response.code = 500;
797 return _response;
798 }
799 scope (exit ) {
800 if ( dataStream !is null && !_response._receiveAsRange.activated ) {
801 dataStream.close();
802 }
803 }
804
805 _response._requestSentAt = Clock.currTime;
806
807 code = sendCmdGetResponse("LIST " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
808 if ( code/100 > 1 && code/100 < 5) {
809 _response.code = code;
810 return _response;
811 }
812 if ( code/100 == 5) {
813 dataStream.close();
814 code = sendCmdGetResponse("PASV\r\n", _controlChannel);
815 if ( code/100 > 2 ) {
816 _response.code = code;
817 return _response;
818 }
819 v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
820 dataStream = connectData(v);
821 code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel);
822 if ( code/100 > 1 ) {
823 _response.code = code;
824 return _response;
825 }
826 }
827
828 dataStream.readTimeout = _timeout;
829 while ( true ) {
830 auto b = new ubyte[_bufferSize];
831 auto rc = dataStream.receive(b);
832 if ( rc <= 0 ) {
833 debug(requests) trace("done");
834 break;
835 }
836 debug(requests) tracef("got %d bytes from data channel", rc);
837
838 _contentReceived += rc;
839 _response._responseBody.putNoCopy(b[0..rc]);
840
841 if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
842 throw new RequestException("maxContentLength exceeded for ftp data");
843 }
844 if ( _useStreaming ) {
845 debug(requests) trace("ftp uses streaming");
846
847 auto __maxContentLength = _maxContentLength;
848 auto __contentLength = _contentLength;
849 auto __contentReceived = _contentReceived;
850 auto __bufferSize = _bufferSize;
851 auto __dataStream = dataStream;
852 auto __controlChannel = _controlChannel;
853
854 _response._contentLength = _contentLength;
855 _response.receiveAsRange.activated = true;
856 _response.receiveAsRange.data.length = 0;
857 _response.receiveAsRange.data = _response._responseBody.data;
858 _response.receiveAsRange.read = delegate ubyte[] () {
859 Buffer!ubyte result;
860 while(true) {
861 // check if we received everything we need
862 if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength )
863 {
864 throw new RequestException("ContentLength > maxContentLength (%d>%d)".
865 format(__contentLength, __maxContentLength));
866 }
867 // have to continue
868 auto b = new ubyte[__bufferSize];
869 ptrdiff_t read;
870 try {
871 read = __dataStream.receive(b);
872 }
873 catch (Exception e) {
874 throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
875 }
876
877 if ( read > 0 ) {
878 _response._contentReceived += read;
879 __contentReceived += read;
880 result.putNoCopy(b[0..read]);
881 return result.data;
882 }
883 if ( read == 0 ) {
884 debug(requests) tracef("streaming_in: server closed connection");
885 __dataStream.close();
886 code = responseToCode(serverResponse(__controlChannel));
887 if ( code/100 == 2 ) {
888 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
889 }
890 _response.code = code;
891 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel);
892 break;
893 }
894 }
895 return result.data;
896 };
897 debug(requests) tracef("leave streaming get");
898 return _response;
899 }
900 }
901 dataStream.close();
902 response = serverResponse(_controlChannel);
903 code = responseToCode(response);
904 if ( code/100 == 2 ) {
905 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
906 }
907 _response.code = code;
908 return _response;
909 }
910 FTPResponse execute(Request r)
911 {
912 string method = r.method;
913 _uri = r.uri();
914 _authenticator = r.authenticator;
915 _maxContentLength = r.maxContentLength;
916 _useStreaming = r.useStreaming;
917 _verbosity = r.verbosity;
918 _cm = r.cm;
919 _postData = r.postData;
920 _bufferSize = r.bufferSize;
921 _proxy = r.proxy;
922 _bind = r.bind;
923 _timeout = r.timeout;
924
925 if ( method == "GET" )
926 {
927 return get();
928 }
929 if ( method == "POST" )
930 {
931 return post();
932 }
933 if ( method == "LIST" ) {
934 return list();
935 }
936 assert(0, "Can't handle method %s for ftp request".format(method));
937 }
938 }
939
940 //package unittest {
941 // import std.process;
942 //
943 // globalLogLevel(LogLevel.info);
944 // bool unreliable_network = environment.get("UNRELIABLENETWORK", "false") == "true";
945 //
946 // info("testing ftp");
947 // auto rq = FTPRequest();
948 // info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
949 // auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
950 // assert(unreliable_network || rs.code == 226);
951 // info("ftp get ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
952 // rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
953 // assert(unreliable_network || rs.code != 226);
954 // info("ftp get ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
955 // rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
956 // assert(unreliable_network || rs.code == 226);
957 // assert(unreliable_network || rs.responseBody.length == 1024);
958 // info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
959 // rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
960 // assert(unreliable_network || rs.code == 226);
961 // info("ftp get ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
962 // try {
963 // rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
964 // }
965 // catch (ConnectError e)
966 // {
967 // }
968 // assert(unreliable_network || rs.code == 226);
969 // info("ftp get ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator");
970 // rq.authenticator = new FtpAuthentication("anonymous", "requests@");
971 // try {
972 // rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
973 // }
974 // catch (ConnectError e)
975 // {
976 // }
977 // assert(unreliable_network || rs.code == 226);
978 // assert(unreliable_network || rs.finalURI.path == "/pub/FreeBSD/README.TXT");
979 // assert(unreliable_network || rq.format("%m|%h|%p|%P|%q|%U") == "GET|ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
980 // assert(unreliable_network || rs.format("%h|%p|%P|%q|%U") == "ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
981 // info("testing ftp - done.");
982 //}