The OpenD Programming Language

1 module requests.ftp;
2 
3 private:
4 import std.ascii;
5 import std.algorithm;
6 import std.conv;
7 import std.datetime;
8 import std.format;
9 import std.exception;
10 import std.string;
11 import std.range;
12 import std.experimental.logger;
13 import std.stdio;
14 import std.path;
15 import std.traits;
16 import std.typecons;
17 
18 import requests.uri;
19 import requests.utils;
20 import requests.streams;
21 import requests.base;
22 import requests.request;
23 import requests.connmanager;
24 import requests.rangeadapter;
25 
26 public class FTPServerResponseError: Exception {
27     this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) @safe pure nothrow {
28         super(message, file, line, next);
29     }
30 }
31 
32 public class FTPResponse : Response {
33 }
34 
35 public class FtpAuthentication: Auth {
36     private {
37         string   _username, _password;
38     }
39     /// Constructor.
40     /// Params:
41     /// username = username
42     /// password = password
43     ///
44     this(string username, string password) {
45         _username = username;
46         _password = password;
47     }
48     override string userName() {
49         return _username;
50     }
51     override string password() {
52         return _password;
53     }
54     override string[string] authHeaders(string domain) {
55         return null;
56     }
57 }
58 
59 enum defaultBufferSize = 8192;
60 
61 public struct FTPRequest {
62     private {
63         URI           _uri;
64         Duration      _timeout = 60.seconds;
65         uint          _verbosity = 0;
66         size_t        _bufferSize = defaultBufferSize;
67         long          _maxContentLength = 5*1024*1024*1024;
68         long          _contentLength = -1;
69         long          _contentReceived;
70         NetworkStream _controlChannel;
71         string[]      _responseHistory;
72         FTPResponse   _response;
73         bool          _useStreaming;
74         Auth           _authenticator;
75         string        _method;
76         string        _proxy;
77         string        _bind;
78         RefCounted!ConnManager      _cm;
79         InputRangeAdapter           _postData;
80     }
81     mixin(Getter_Setter!Duration("timeout"));
82     mixin(Getter_Setter!uint("verbosity"));
83     mixin(Getter_Setter!size_t("bufferSize"));
84     mixin(Getter_Setter!long("maxContentLength"));
85     mixin(Getter_Setter!bool("useStreaming"));
86     mixin(Getter("contentLength"));
87     mixin(Getter("contentReceived"));
88     mixin(Setter!Auth("authenticator"));
89     mixin(Getter_Setter!string("proxy"));
90     mixin(Getter_Setter!string("bind"));
91 
92     @property final string[] responseHistory() @safe @nogc nothrow {
93         return _responseHistory;
94     }
95     this(string uri) {
96         _uri = URI(uri);
97     }
98 
99     this(in URI uri) {
100         _uri = uri;
101     }
102 
103     ~this() {
104         //if ( _controlChannel ) {
105         //    _controlChannel.close();
106         //}
107     }
108     string toString() const {
109         return "FTPRequest(%s, %s)".format(_method, _uri.uri());
110     }
111     string format(string fmt) const {
112         import std.array;
113         import std.stdio;
114         auto a = appender!string();
115         auto f = FormatSpec!char(fmt);
116         while (f.writeUpToNextSpec(a)) {
117             switch(f.spec) {
118                 case 'h':
119                     // Remote hostname.
120                     a.put(_uri.host);
121                     break;
122                 case 'm':
123                     // method.
124                     a.put(_method);
125                     break;
126                 case 'p':
127                     // Remote port.
128                     a.put("%d".format(_uri.port));
129                     break;
130                 case 'P':
131                     // Path
132                     a.put(_uri.path);
133                     break;
134                 case 'q':
135                     // query parameters supplied with url.
136                     a.put(_uri.query);
137                     break;
138                 case 'U':
139                     a.put(_uri.uri());
140                     break;
141                 default:
142                     throw new FormatException("Unknown Request format spec " ~ f.spec);
143             }
144         }
145         return a.data();
146     }
147     ushort sendCmdGetResponse(string cmd, NetworkStream __controlChannel) {
148         debug(requests) tracef("cmd to server: %s", cmd.strip);
149         if ( _verbosity >=1 ) {
150             writefln("> %s", cmd.strip);
151         }
152         __controlChannel.send(cmd);
153         string response = serverResponse(__controlChannel);
154         _responseHistory ~= response;
155         return responseToCode(response);
156     }
157 
158     ushort responseToCode(string response) pure const @safe {
159         return to!ushort(response[0..3]);
160     }
161 
162     void handleChangeURI(in string uri) @safe {
163         // if control channel exists and new URL not match old, then close
164         URI newURI = URI(uri);
165         if ( _controlChannel && 
166             (newURI.host != _uri.host || newURI.port != _uri.port || newURI.username != _uri.username)) {
167             _controlChannel.close();
168             _controlChannel = null;
169         }
170         _uri = newURI;
171     }
172 
173     string serverResponse(NetworkStream __controlChannel) {
174         string res, buffer;
175         immutable bufferLimit = 16*1024;
176         __controlChannel.readTimeout = _timeout;
177         scope(exit) {
178             __controlChannel.readTimeout = 0.seconds;
179         }
180         debug(requests) trace("Wait on control channel");
181         auto b = new ubyte[1];
182         while ( __controlChannel && __controlChannel.isConnected && buffer.length < bufferLimit ) {
183             ptrdiff_t rc;
184             try {
185                 rc = __controlChannel.receive(b);
186             }
187             catch (Exception e) {
188                 error("Failed to read response from server");
189                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__, e);
190             }
191             //debug(requests) tracef("Got %d bytes from control socket", rc);
192             if ( rc == 0 ) {
193                 error("Failed to read response from server");
194                 throw new FTPServerResponseError("Failed to read server responce over control channel", __FILE__, __LINE__);
195             }
196             buffer ~= b[0..rc];
197             if ( buffer.endsWith('\n') ){
198                 if ( _verbosity >= 1 ) {
199                     buffer.
200                     splitLines.
201                     each!(l=>writefln("< %s", l));
202                 }
203                 auto responseLines = buffer.
204                     splitLines.
205                     filter!(l => l.length>3 && l[3]==' ' && l[0..3].all!isDigit);
206                 if ( responseLines.count > 0 ) {
207                     return responseLines.front;
208                 }
209             }
210         }
211         throw new FTPServerResponseError("Failed to read server responce over control channel");
212         assert(0);
213     }
214     ushort tryCdOrCreatePath(string[] path) {
215         /*
216          * At start we stay at original path, we have to create next path element
217          * For example:
218          * path = ["", "a", "b"] - we stay in root (path[0]), we have to cd and return ok
219          * or try to cteate "a" and cd to "a".
220          */
221         debug(requests) info("Trying to create path %s".format(path));
222         enforce(path.length>=2, "You called tryCdOrCreate, but there is nothing to create: %s".format(path));
223         auto next_dir = path[1];
224         auto code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel);
225         if ( code >= 300) {
226             // try to create, then again CWD
227             code = sendCmdGetResponse("MKD " ~ next_dir ~ "\r\n", _controlChannel);
228             if ( code > 300 ) {
229                 return code;
230             }
231             code = sendCmdGetResponse("CWD " ~ next_dir ~ "\r\n", _controlChannel);
232         }
233         if ( path.length == 2 ) {
234             return code;
235         }
236         return tryCdOrCreatePath(path[1..$]);
237     }
238 
239     FTPResponse post(R, A...)(string uri, R content, A args) 
240         if ( __traits(compiles, cast(ubyte[])content) 
241         || (rank!R == 2 && isSomeChar!(Unqual!(typeof(content.front.front)))) 
242         || (rank!R == 2 && (is(Unqual!(typeof(content.front.front)) == ubyte)))
243         )
244     {
245         if ( uri ) {
246             handleChangeURI(uri);
247         }
248         _postData = makeAdapter(content);
249         return post();
250     }
251 
252     FTPResponse post()
253     {
254         string response;
255         ushort code;
256 
257         _response = new FTPResponse;
258         _response._startedAt = Clock.currTime;
259         _method = "POST";
260 
261         scope(exit) {
262             _response._finishedAt = Clock.currTime;
263         }
264 
265         _response.uri = _uri;
266         _response.finalURI = _uri;
267 
268         _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
269         
270         if ( !_controlChannel ) {
271             _controlChannel = new TCPStream();
272             _controlChannel.connect(_uri.host, _uri.port, _timeout);
273             response = serverResponse(_controlChannel);
274             _responseHistory ~= response;
275             
276             code = responseToCode(response);
277             debug(requests) tracef("Server initial response: %s", response);
278             if ( code/100 > 2 ) {
279                 _response.code = code;
280                 return _response;
281             }
282             // Log in
283             string user, pass;
284             if ( _authenticator ) {
285                 user = _authenticator.userName();
286                 pass = _authenticator.password();
287             }
288             else{
289                 user = _uri.username.length ? _uri.username : "anonymous";
290                 pass = _uri.password.length ? _uri.password : "requests@";
291             }
292             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
293             
294             code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
295             if ( code/100 > 3 ) {
296                 _response.code = code;
297                 return _response;
298             } else if ( code/100 == 3) {
299                 
300                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
301                 if ( code/100 > 2 ) {
302                     _response.code = code;
303                     return _response;
304                 }
305             }
306             
307         }
308         code = sendCmdGetResponse("PWD\r\n", _controlChannel);
309         string pwd;
310         if ( code/100 == 2 ) {
311             // like '257 "/home/testuser"'
312             auto a = _responseHistory[$-1].split();
313             if ( a.length > 1 ) {
314                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
315             }
316         }
317         scope (exit) {
318             if ( pwd && _controlChannel ) {
319                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
320             }
321         }
322 
323         auto path = dirName(_uri.path);
324         if ( path != "/") {
325             path = path.chompPrefix("/");
326         }
327         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
328         if ( code == 550 ) {
329             // try to create directory end enter it
330             code = tryCdOrCreatePath(dirName(_uri.path).split('/'));
331         }
332         if ( code/100 > 2 ) {
333             _response.code = code;
334             return _response;
335         }
336 
337         code = sendCmdGetResponse("PASV\r\n", _controlChannel);
338         if ( code/100 > 2 ) {
339             _response.code = code;
340             return _response;
341         }
342         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
343         // in last response.
344         // Cut anything between ( and )
345         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
346         string host;
347         ushort port;
348         try {
349             ubyte a1,a2,a3,a4,p1,p2;
350             formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
351             host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
352             port = (p1<<8) + p2;
353         } catch (FormatException e) {
354             error("Failed to parse ", v);
355             _response.code = 500;
356             return _response;
357         }
358 
359         auto dataStream = new TCPStream();
360         scope (exit ) {
361             if ( dataStream !is null ) {
362                 dataStream.close();
363             }
364         }
365 
366         dataStream.connect(host, port, _timeout);
367 
368         code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
369         if ( code/100 > 2 ) {
370             _response.code = code;
371             return _response;
372         }
373 
374         code = sendCmdGetResponse("STOR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
375         if ( code/100 > 1 ) {
376             _response.code = code;
377             return _response;
378         }
379         size_t uploaded;
380         while ( !_postData.empty ) {
381             auto chunk = _postData.front;
382             uploaded += chunk.length;
383             dataStream.send(chunk);
384             _postData.popFront;
385         }
386         debug(requests) tracef("sent");
387         dataStream.close();
388         dataStream = null;
389         response = serverResponse(_controlChannel);
390         code = responseToCode(response);
391         if ( code/100 == 2 ) {
392             debug(requests) tracef("Successfully uploaded %d bytes", uploaded);
393         }
394         _response.code = code;
395         return _response;
396     }
397     private auto connectData(string v)
398     {
399         string host;
400         ushort port;
401 
402         ubyte a1,a2,a3,a4,p1,p2;
403         formattedRead(v, "%d,%d,%d,%d,%d,%d", &a1, &a2, &a3, &a4, &p1, &p2);
404         host = std.format.format("%d.%d.%d.%d", a1, a2, a3, a4);
405         port = (p1<<8) + p2;
406 
407         auto dataStream = new TCPStream();
408         dataStream.bind(_bind);
409         dataStream.connect(host, port, _timeout);
410         return dataStream;
411     }
412 
413     FTPResponse get(string uri = null) {
414         enforce( uri || _uri.host, "FTP URL undefined");
415         string response;
416         ushort code;
417 
418         _response = new FTPResponse;
419         _contentReceived = 0;
420         _method = "GET";
421 
422         _response._startedAt = Clock.currTime;
423         scope(exit) {
424             _response._finishedAt = Clock.currTime;
425         }
426 
427         if ( uri ) {
428             handleChangeURI(uri);
429         }
430 
431         _response.uri = _uri;
432         _response.finalURI = _uri;
433 
434         _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
435         
436         if ( !_controlChannel ) {
437             _controlChannel = new TCPStream();
438             _controlChannel.bind(_bind);
439             _controlChannel.connect(_uri.host, _uri.port, _timeout);
440             if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) )
441             {
442                 debug(requests) tracef("closing purged connection %s", purged_connection);
443                 purged_connection.close();
444             }
445             _response._connectedAt = Clock.currTime;
446             response = serverResponse(_controlChannel);
447             _responseHistory ~= response;
448             
449             code = responseToCode(response);
450             debug(requests) tracef("Server initial response: %s", response);
451             if ( code/100 > 2 ) {
452                 _response.code = code;
453                 return _response;
454             }
455             // Log in
456             string user, pass;
457             if ( _authenticator ) {
458                 user = _authenticator.userName();
459                 pass = _authenticator.password();
460             }
461             else{
462                 user = _uri.username.length ? _uri.username : "anonymous";
463                 pass = _uri.password.length ? _uri.password : "requests@";
464             }
465             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
466             
467             code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
468             if ( code/100 > 3 ) {
469                 _response.code = code;
470                 return _response;
471             } else if ( code/100 == 3) {
472                 
473                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
474                 if ( code/100 > 2 ) {
475                     _response.code = code;
476                     return _response;
477                 }
478             }
479         }
480         else {
481             _response._connectedAt = Clock.currTime;
482         }
483 
484         code = sendCmdGetResponse("PWD\r\n", _controlChannel);
485         string pwd;
486         if ( code/100 == 2 ) {
487             // like '257 "/home/testuser"'
488             auto a = _responseHistory[$-1].split();
489             if ( a.length > 1 ) {
490                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
491             }
492         }
493         scope (exit) {
494             if ( pwd && _controlChannel && !_useStreaming ) {
495                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
496             }
497         }
498         
499         auto path = dirName(_uri.path);
500         if ( path != "/") {
501             path = path.chompPrefix("/");
502         }
503         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
504         if ( code/100 > 2 ) {
505             _response.code = code;
506             return _response;
507         }
508 
509         code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
510         if ( code/100 > 2 ) {
511             _response.code = code;
512             return _response;
513         }
514         
515         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
516         if ( code/100 == 2 ) {
517             // something like 
518             // 213 229355520
519             auto s = _responseHistory[$-1].findSplitAfter(" ");
520             if ( s.length ) {
521                 try {
522                     _contentLength = to!long(s[1]);
523                 } catch (ConvException) {
524                     debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
525                 }
526             }
527         }
528 
529         if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) {
530             throw new RequestException("maxContentLength exceeded for ftp data");
531         }
532 
533         code = sendCmdGetResponse("PASV\r\n", _controlChannel);
534         if ( code/100 > 2 ) {
535             _response.code = code;
536             return _response;
537         }
538         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
539         // in last response.
540         // Cut anything between ( and )
541         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
542 
543         TCPStream dataStream;
544         try{
545             dataStream = connectData(v);
546         } catch (FormatException e) {
547             error("Failed to parse ", v);
548             _response.code = 500;
549             return _response;
550         }
551         scope (exit ) {
552             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
553                 dataStream.close();
554             }
555         }
556 
557         _response._requestSentAt = Clock.currTime;
558         
559         code = sendCmdGetResponse("RETR " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
560         if ( code/100 > 1 && code/100 < 5) {
561             _response.code = code;
562             return _response;
563         }
564         if ( code/100 == 5) {
565             dataStream.close();
566             code = sendCmdGetResponse("PASV\r\n", _controlChannel);
567             if ( code/100 > 2 ) {
568                 _response.code = code;
569                 return _response;
570             }
571             v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
572             dataStream = connectData(v);
573             code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel);
574             if ( code/100 > 1 ) {
575                 _response.code = code;
576                 return _response;
577             }
578         }
579 
580         dataStream.readTimeout = _timeout;
581         while ( true ) {
582             auto b = new ubyte[_bufferSize];
583             auto rc = dataStream.receive(b);
584             if ( rc <= 0 ) {
585                 debug(requests) trace("done");
586                 break;
587             }
588             debug(requests) tracef("got %d bytes from data channel", rc);
589 
590             _contentReceived += rc;
591             _response._responseBody.putNoCopy(b[0..rc]);
592 
593             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
594                 throw new RequestException("maxContentLength exceeded for ftp data");
595             }
596             if ( _useStreaming ) {
597                 debug(requests) trace("ftp uses streaming");
598 
599                 auto __maxContentLength = _maxContentLength;
600                 auto __contentLength = _contentLength;
601                 auto __contentReceived = _contentReceived;
602                 auto __bufferSize = _bufferSize;
603                 auto __dataStream = dataStream;
604                 auto __controlChannel = _controlChannel;
605 
606                 _response._contentLength = _contentLength;
607                 _response.receiveAsRange.activated = true;
608                 _response.receiveAsRange.data.length = 0;
609                 _response.receiveAsRange.data = _response._responseBody.data;
610                 _response.receiveAsRange.read = delegate ubyte[] () {
611                     Buffer!ubyte result;
612                     while(true) {
613                         // check if we received everything we need
614                         if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength ) 
615                         {
616                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
617                                 format(__contentLength, __maxContentLength));
618                         }
619                         // have to continue
620                         auto b = new ubyte[__bufferSize];
621                         ptrdiff_t read;
622                         try {
623                             read = __dataStream.receive(b);
624                         }
625                         catch (Exception e) {
626                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
627                         }
628 
629                         if ( read > 0 ) {
630                             _response._contentReceived += read;
631                             __contentReceived += read;
632                             result.putNoCopy(b[0..read]);
633                             return result.data;
634                         }
635                         if ( read == 0 ) {
636                             debug(requests) tracef("streaming_in: server closed connection");
637                             __dataStream.close();
638                             code = responseToCode(serverResponse(__controlChannel));
639                             if ( code/100 == 2 ) {
640                                 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
641                             }
642                             _response.code = code;
643                             sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel);
644                             break;
645                         }
646                     }
647                     return result.data;
648                 };
649                 debug(requests) tracef("leave streaming get");
650                 return _response;
651             }
652         }
653         dataStream.close();
654         response = serverResponse(_controlChannel);
655         code = responseToCode(response);
656         if ( code/100 == 2 ) {
657             debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
658         }
659         _response.code = code;
660         return _response;
661     }
662     FTPResponse list(string uri = null) {
663         enforce( uri || _uri.host, "FTP URL undefined");
664         string response;
665         ushort code;
666 
667         _response = new FTPResponse;
668         _contentReceived = 0;
669         _method = "GET";
670 
671         _response._startedAt = Clock.currTime;
672         scope(exit) {
673             _response._finishedAt = Clock.currTime;
674         }
675 
676         if ( uri ) {
677             handleChangeURI(uri);
678         }
679 
680         _response.uri = _uri;
681         _response.finalURI = _uri;
682 
683         _controlChannel = _cm.get(_uri.scheme, _uri.host, _uri.port);
684 
685         if ( !_controlChannel ) {
686             _controlChannel = new TCPStream();
687             _controlChannel.bind(_bind);
688             _controlChannel.connect(_uri.host, _uri.port, _timeout);
689             if ( auto purged_connection = _cm.put(_uri.scheme, _uri.host, _uri.port, _controlChannel) )
690             {
691                 debug(requests) tracef("closing purged connection %s", purged_connection);
692                 purged_connection.close();
693             }
694             _response._connectedAt = Clock.currTime;
695             response = serverResponse(_controlChannel);
696             _responseHistory ~= response;
697 
698             code = responseToCode(response);
699             debug(requests) tracef("Server initial response: %s", response);
700             if ( code/100 > 2 ) {
701                 _response.code = code;
702                 return _response;
703             }
704             // Log in
705             string user, pass;
706             if ( _authenticator ) {
707                 user = _authenticator.userName();
708                 pass = _authenticator.password();
709             }
710             else{
711                 user = _uri.username.length ? _uri.username : "anonymous";
712                 pass = _uri.password.length ? _uri.password : "requests@";
713             }
714             debug(requests) tracef("Use %s:%s%s as username:password", user, pass[0], replicate("-", pass.length-1));
715 
716             code = sendCmdGetResponse("USER " ~ user ~ "\r\n", _controlChannel);
717             if ( code/100 > 3 ) {
718                 _response.code = code;
719                 return _response;
720             } else if ( code/100 == 3) {
721                 code = sendCmdGetResponse("PASS " ~ pass ~ "\r\n", _controlChannel);
722                 if ( code/100 > 2 ) {
723                     _response.code = code;
724                     return _response;
725                 }
726             }
727         }
728         else {
729             _response._connectedAt = Clock.currTime;
730         }
731 
732         code = sendCmdGetResponse("PWD\r\n", _controlChannel);
733         string pwd;
734         if ( code/100 == 2 ) {
735             // like '257 "/home/testuser"'
736             auto a = _responseHistory[$-1].split();
737             if ( a.length > 1 ) {
738                 pwd = a[1].chompPrefix(`"`).chomp(`"`);
739             }
740         }
741         scope (exit) {
742             if ( pwd && _controlChannel && !_useStreaming ) {
743                 sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", _controlChannel);
744             }
745         }
746 
747         auto path = dirName(_uri.path);
748         if ( path != "/") {
749             path = path.chompPrefix("/");
750         }
751         code = sendCmdGetResponse("CWD " ~ path ~ "\r\n", _controlChannel);
752         if ( code/100 > 2 ) {
753             _response.code = code;
754             return _response;
755         }
756 
757         code = sendCmdGetResponse("TYPE I\r\n", _controlChannel);
758         if ( code/100 > 2 ) {
759             _response.code = code;
760             return _response;
761         }
762 
763         code = sendCmdGetResponse("SIZE " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
764         if ( code/100 == 2 ) {
765             // something like
766             // 213 229355520
767             auto s = _responseHistory[$-1].findSplitAfter(" ");
768             if ( s.length ) {
769                 try {
770                     _contentLength = to!long(s[1]);
771                 } catch (ConvException) {
772                     debug(requests) trace("Failed to convert string %s to file size".format(s[1]));
773                 }
774             }
775         }
776 
777         if ( _maxContentLength > 0 && _contentLength > _maxContentLength ) {
778             throw new RequestException("maxContentLength exceeded for ftp data");
779         }
780 
781         code = sendCmdGetResponse("PASV\r\n", _controlChannel);
782         if ( code/100 > 2 ) {
783             _response.code = code;
784             return _response;
785         }
786         // something like  "227 Entering Passive Mode (132,180,15,2,210,187)" expected
787         // in last response.
788         // Cut anything between ( and )
789         auto v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
790 
791         TCPStream dataStream;
792         try{
793             dataStream = connectData(v);
794         } catch (FormatException e) {
795             error("Failed to parse ", v);
796             _response.code = 500;
797             return _response;
798         }
799         scope (exit ) {
800             if ( dataStream !is null && !_response._receiveAsRange.activated ) {
801                 dataStream.close();
802             }
803         }
804 
805         _response._requestSentAt = Clock.currTime;
806 
807         code = sendCmdGetResponse("LIST " ~ baseName(_uri.path) ~ "\r\n", _controlChannel);
808         if ( code/100 > 1 && code/100 < 5) {
809             _response.code = code;
810             return _response;
811         }
812         if ( code/100 == 5) {
813             dataStream.close();
814             code = sendCmdGetResponse("PASV\r\n", _controlChannel);
815             if ( code/100 > 2 ) {
816                 _response.code = code;
817                 return _response;
818             }
819             v = _responseHistory[$-1].findSplitBefore(")")[0].findSplitAfter("(")[1];
820             dataStream = connectData(v);
821             code = sendCmdGetResponse("NLST " ~ _uri.path ~ "\r\n", _controlChannel);
822             if ( code/100 > 1 ) {
823                 _response.code = code;
824                 return _response;
825             }
826         }
827 
828         dataStream.readTimeout = _timeout;
829         while ( true ) {
830             auto b = new ubyte[_bufferSize];
831             auto rc = dataStream.receive(b);
832             if ( rc <= 0 ) {
833                 debug(requests) trace("done");
834                 break;
835             }
836             debug(requests) tracef("got %d bytes from data channel", rc);
837 
838             _contentReceived += rc;
839             _response._responseBody.putNoCopy(b[0..rc]);
840 
841             if ( _maxContentLength && _response._responseBody.length >= _maxContentLength ) {
842                 throw new RequestException("maxContentLength exceeded for ftp data");
843             }
844             if ( _useStreaming ) {
845                 debug(requests) trace("ftp uses streaming");
846 
847                 auto __maxContentLength = _maxContentLength;
848                 auto __contentLength = _contentLength;
849                 auto __contentReceived = _contentReceived;
850                 auto __bufferSize = _bufferSize;
851                 auto __dataStream = dataStream;
852                 auto __controlChannel = _controlChannel;
853 
854                 _response._contentLength = _contentLength;
855                 _response.receiveAsRange.activated = true;
856                 _response.receiveAsRange.data.length = 0;
857                 _response.receiveAsRange.data = _response._responseBody.data;
858                 _response.receiveAsRange.read = delegate ubyte[] () {
859                     Buffer!ubyte result;
860                     while(true) {
861                         // check if we received everything we need
862                         if ( __maxContentLength > 0 && __contentReceived >= __maxContentLength ) 
863                         {
864                             throw new RequestException("ContentLength > maxContentLength (%d>%d)".
865                                 format(__contentLength, __maxContentLength));
866                         }
867                         // have to continue
868                         auto b = new ubyte[__bufferSize];
869                         ptrdiff_t read;
870                         try {
871                             read = __dataStream.receive(b);
872                         }
873                         catch (Exception e) {
874                             throw new RequestException("streaming_in error reading from socket", __FILE__, __LINE__, e);
875                         }
876 
877                         if ( read > 0 ) {
878                             _response._contentReceived += read;
879                             __contentReceived += read;
880                             result.putNoCopy(b[0..read]);
881                             return result.data;
882                         }
883                         if ( read == 0 ) {
884                             debug(requests) tracef("streaming_in: server closed connection");
885                             __dataStream.close();
886                             code = responseToCode(serverResponse(__controlChannel));
887                             if ( code/100 == 2 ) {
888                                 debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
889                             }
890                             _response.code = code;
891                             sendCmdGetResponse("CWD " ~ pwd ~ "\r\n", __controlChannel);
892                             break;
893                         }
894                     }
895                     return result.data;
896                 };
897                 debug(requests) tracef("leave streaming get");
898                 return _response;
899             }
900         }
901         dataStream.close();
902         response = serverResponse(_controlChannel);
903         code = responseToCode(response);
904         if ( code/100 == 2 ) {
905             debug(requests) tracef("Successfully received %d bytes", _response._responseBody.length);
906         }
907         _response.code = code;
908         return _response;
909     }
910     FTPResponse execute(Request r)
911     {
912         string method = r.method;
913         _uri = r.uri();
914         _authenticator = r.authenticator;
915         _maxContentLength = r.maxContentLength;
916         _useStreaming = r.useStreaming;
917         _verbosity = r.verbosity;
918         _cm = r.cm;
919         _postData = r.postData;
920         _bufferSize = r.bufferSize;
921         _proxy = r.proxy;
922         _bind = r.bind;
923         _timeout = r.timeout;
924 
925         if ( method == "GET" )
926         {
927             return get();
928         }
929         if ( method == "POST" )
930         {
931             return post();
932         }
933         if ( method == "LIST" ) {
934             return list();
935         }
936         assert(0, "Can't handle method %s for ftp request".format(method));
937     }
938 }
939 
940 //package unittest {
941 //    import std.process;
942 //
943 //    globalLogLevel(LogLevel.info);
944 //    bool unreliable_network = environment.get("UNRELIABLENETWORK", "false") == "true";
945 //
946 //    info("testing ftp");
947 //    auto rq = FTPRequest();
948 //    info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
949 //    auto rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "test, ignore please\n".representation);
950 //    assert(unreliable_network || rs.code == 226);
951 //    info("ftp get  ", "ftp://speedtest.tele2.net/nonexistent", ", in same session.");
952 //    rs = rq.get("ftp://speedtest.tele2.net/nonexistent");
953 //    assert(unreliable_network || rs.code != 226);
954 //    info("ftp get  ", "ftp://speedtest.tele2.net/1KB.zip", ", in same session.");
955 //    rs = rq.get("ftp://speedtest.tele2.net/1KB.zip");
956 //    assert(unreliable_network || rs.code == 226);
957 //    assert(unreliable_network || rs.responseBody.length == 1024);
958 //    info("ftp post ", "ftp://speedtest.tele2.net/upload/TEST.TXT");
959 //    rs = rq.post("ftp://speedtest.tele2.net/upload/TEST.TXT", "another test, ignore please\n".representation);
960 //    assert(unreliable_network || rs.code == 226);
961 //    info("ftp get  ", "ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
962 //    try {
963 //        rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
964 //    }
965 //    catch (ConnectError e)
966 //    {
967 //    }
968 //    assert(unreliable_network || rs.code == 226);
969 //    info("ftp get  ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT with authenticator");
970 //    rq.authenticator = new FtpAuthentication("anonymous", "requests@");
971 //    try {
972 //        rs = rq.get("ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
973 //    }
974 //    catch (ConnectError e)
975 //    {
976 //    }
977 //    assert(unreliable_network || rs.code == 226);
978 //    assert(unreliable_network || rs.finalURI.path == "/pub/FreeBSD/README.TXT");
979 //    assert(unreliable_network || rq.format("%m|%h|%p|%P|%q|%U") == "GET|ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
980 //    assert(unreliable_network || rs.format("%h|%p|%P|%q|%U") == "ftp.iij.ad.jp|21|/pub/FreeBSD/README.TXT||ftp://ftp.iij.ad.jp/pub/FreeBSD/README.TXT");
981 //    info("testing ftp - done.");
982 //}