1 module requests.pool; 2 3 import std.stdio; 4 import std.range; 5 import std.algorithm; 6 import std.concurrency; 7 import std.typecons; 8 import std.variant; 9 import std.random; 10 import std.string; 11 import std.format; 12 import core.thread; 13 import std.exception; 14 import std.experimental.logger; 15 16 import requests.request; 17 import requests.http; 18 import requests.base; 19 import requests.uri; 20 import requests.streams; 21 22 struct Job { 23 enum Method { 24 GET, 25 POST, 26 }; 27 28 package { 29 string _url; 30 Method _method = Method.GET; 31 immutable(ubyte)[] _data; // data for post 32 immutable(ubyte)[] _opaque; // opaque data tie request and response 33 uint _maxRedirects = 10; 34 Duration _timeout = 30.seconds; 35 immutable(string)[] _headers_h; 36 immutable(string)[] _headers_v; 37 } 38 39 auto method(Method m) { 40 _method = m; 41 return this; 42 } 43 auto method(string m) { 44 switch(m.toUpper()) { 45 case "POST": 46 _method = Method.POST; 47 break; 48 case "GET": 49 _method = Method.GET; 50 break; 51 default: 52 throw new Exception("Unknown method %s, known methods are GET,POST".format(m)); 53 } 54 return this; 55 } 56 auto data(immutable(ubyte)[] d) { 57 _data = d; 58 return this; 59 } 60 auto maxRedirects(uint n) { 61 _maxRedirects = n; 62 return this; 63 } 64 auto timeout(Duration t) { 65 _timeout = t; 66 return this; 67 } 68 auto opaque(immutable(ubyte)[] o) { 69 _opaque = o; 70 return this; 71 } 72 auto addHeaders(in string[string] headers) { 73 foreach(pair; headers.byKeyValue) { 74 _headers_h ~= idup(pair.key); 75 _headers_v ~= idup(pair.value); 76 } 77 return this; 78 } 79 } 80 81 struct Result { 82 enum { 83 OK = 1, 84 QUIT = 2, 85 EXCEPTION = 4 86 } 87 package { 88 uint _flags; 89 ushort _code; 90 immutable(ubyte)[] _data; // response body 91 immutable(ubyte)[] _opaque; // opaque data tie request and response 92 } 93 auto flags() pure @nogc { 94 return _flags; 95 } 96 auto code() pure @nogc { 97 return _code; 98 } 99 auto data() pure @nogc { 100 return _data; 101 } 102 auto opaque() pure @nogc { 103 return _opaque; 104 } 105 } 106 107 struct Quit { 108 } 109 110 struct Route { 111 string scheme; 112 string host; 113 ushort port; 114 115 @disable this(); 116 117 this(string url) { 118 URI parsed = URI(url); 119 scheme = parsed.scheme; 120 host = parsed.host; 121 port = parsed.port; 122 } 123 bool opEquals(Route other) { 124 bool r = this.scheme == other.scheme 125 && this.host == other.host 126 && this.port == other.port; 127 return r; 128 } 129 bool opEquals(ref Route other) { 130 bool r = this.scheme == other.scheme 131 && this.host == other.host 132 && this.port == other.port; 133 return r; 134 } 135 } 136 137 138 void worker() { 139 Request rq; 140 bool run = true; 141 142 Result process(ref Job j) { 143 debug(requests) tracef("Received job %s", j._url); 144 145 rq.maxRedirects(j._maxRedirects); 146 rq.timeout(j._timeout); 147 rq.clearHeaders(); 148 if ( j._headers_h.length ) { 149 auto headers = assocArray(zip(j._headers_h.dup, j._headers_v.dup)); 150 rq.addHeaders(headers); 151 } 152 Response rs; 153 try { 154 final switch(j._method) { 155 case Job.Method.GET: 156 rs = rq.get(j._url); 157 break; 158 case Job.Method.POST: 159 rs = rq.post(j._url, j._data); 160 break; 161 } 162 return Result(Result.OK, rs.code, assumeUnique(rs.responseBody.data), j._opaque); 163 } catch (Exception e) { 164 return Result(Result.EXCEPTION, 500, e.msg.representation(), j._opaque); 165 } 166 } 167 168 try { 169 while (run) { 170 receive( 171 (Tid p, Quit q) { 172 // cmd to quit 173 debug(requests) tracef("got quit"); 174 run = false; 175 }, 176 (Tid p, Job j) { 177 // cmd to process 178 debug(requests) tracef("got job"); 179 auto r = process(j); 180 p.send(thisTid(), r); 181 }, 182 ); 183 } 184 } 185 catch (OwnerTerminated e) { 186 debug(requests) tracef("parent terminated"); 187 } 188 catch (Exception e) { 189 errorf("Exception ", e); 190 } 191 finally { 192 debug(requests) tracef("worker done"); 193 } 194 } 195 196 class Manager(R) { 197 R _range; // input range 198 uint _workers; // max num of workers 199 Route[Tid] _idle; // idle workers with last route served 200 Route[Tid] _busy; // busy workers with currently serving route 201 Job[Tid] _box; // one-element queue 202 size_t _sent; 203 size_t _received; 204 Nullable!Result _result; 205 uint _rc; // ref counter 206 bool _exhausted; 207 208 bool boxIsEmpty(Tid t) { 209 return _box[t] == Job.init; 210 } 211 212 auto findWorker(Route route) { 213 foreach(t, ref r; _idle) { 214 if ( r == route ) { 215 // use it 216 return t; 217 } 218 } 219 foreach(t, ref r; _busy) { 220 if ( r == route && _box[t] == Job.init ) { 221 // use it 222 return t; 223 } 224 } 225 if ( _busy.length + _idle.length < _workers ) { 226 return Tid.init; 227 } 228 return _idle.keys[0]; 229 } 230 } 231 232 struct Pool(R) { 233 private: 234 Manager!R _m; 235 public: 236 string toString() { 237 return "Pool<>"; 238 } 239 240 ~this() { 241 _m._rc--; 242 debug(requests) tracef("on ~ rc=%d", _m._rc); 243 Tid me = thisTid(); 244 if ( _m._rc == 0 ) { 245 foreach(ref t; _m._busy.keys ~ _m._idle.keys) { 246 debug(requests) tracef("sending Quit message to workers"); 247 t.send(me, Quit()); 248 } 249 } 250 } 251 252 this(R input_range, uint w) { 253 _m = new Manager!R(); 254 _m._rc = 1; 255 _m._range = input_range; 256 _m._workers = w; 257 } 258 this(this) { 259 assert( _m._rc > 0); 260 _m._rc++; 261 debug(requests) tracef("copy rc=%d", _m._rc); 262 } 263 264 bool empty() { 265 debug(requests) tracef("input empty: %s, exhausted: %s, busy: %d", _m._range.empty(), _m._exhausted, _m._busy.length); 266 if ( _m._range.empty() && _m._sent == 0 ) { 267 // we didn't start processing and input already empty. Empty input range? 268 return true; 269 } 270 return _m._exhausted; 271 } 272 /** 273 popFront 274 */ 275 void popFront() 276 in 277 { 278 assert(_m._busy.length > 0 || _m._range.empty || _m._result.isNull); 279 assert(_m._busy.length + _m._idle.length <= _m._workers); 280 } 281 do 282 { 283 auto owner = thisTid(); 284 Nullable!Tid idle; 285 bool result_ready = false; 286 debug(requests) tracef("busy: %d, idle: %d, workers: %d", _m._busy.length, _m._idle.length, _m._workers); 287 if ( _m._result.isNull ) { 288 // case when popFront called without front 289 front; 290 } 291 if ( _m._busy.length > 0 ) { 292 receive( 293 (Tid t, Result r) { 294 assert(t in _m._busy, "received response not from busy thread"); 295 _m._received++; 296 _m._result = r; 297 result_ready = true; 298 if ( ! _m.boxIsEmpty(t) ) { 299 Job j = _m._box[t]; 300 assert(Route(j._url) == _m._busy[t], "wrong route"); 301 debug(requests) tracef("send job %s from the box", j._url); 302 // have job with the same route, worker is still busy 303 _m._box[t] = Job.init; 304 t.send(owner, j); 305 _m._sent++; 306 } else { 307 // move this thread from busy to idle threads 308 Route route = _m._busy[t]; 309 debug(requests) tracef("release busy thread %s", route); 310 _m._busy.remove(t); 311 _m._idle[t] = route; 312 idle = t; 313 } 314 } 315 ); 316 } 317 while( !_m._range.empty() && _m._busy.length < _m._workers) { 318 debug(requests) trace("push next job to pool"); 319 Job j = _m._range.front(); 320 _m._range.popFront(); 321 Route route = Route(j._url); 322 /* 323 find best route. 324 1. look up for idle worker that served same route. 325 2. if 1. failed - look up for busy worker who server same route and have empty box 326 3. if 1 and 2 failed - just use any idle worker ( we must have one anyay) 327 */ 328 auto t = _m.findWorker(route); 329 if ( t in _m._busy ) { 330 // just place in box 331 assert(_m._box[t] == Job.init); 332 debug(requests) tracef("found best for %s in busy %s", route, _m._busy[t]); 333 _m._box[t] = j; 334 continue; 335 } else 336 if ( t in _m._idle ) { 337 debug(requests) tracef("found best for %s in idle %s", route, _m._idle[t]); 338 fromIdleToBusy(t, route); 339 } else 340 if ( !idle.isNull ) { 341 debug(requests) tracef("use just released idle (prev job %s) for %s", _m._idle[t], route); 342 t = idle.get; 343 idle.nullify(); 344 fromIdleToBusy(t, route); 345 } else { 346 debug(requests) tracef("create worker for %s", route); 347 t = spawn(&worker); 348 _m._box[t] = Job.init; 349 _m._busy[t] = route; 350 } 351 t.send(owner, j); 352 _m._sent++; 353 } 354 debug(requests) tracef("input empty: %s, sent: %d, received: %d, busy: %d", 355 _m._range.empty, _m._sent, _m._received,_m._busy.length ); 356 if ( !result_ready && _m._range.empty && _m._sent == _m._received && _m._busy.length==0) { 357 _m._exhausted = true; 358 } 359 else { 360 _m._exhausted = false; 361 } 362 } 363 /** 364 front 365 */ 366 Result front() 367 out { 368 assert(_m._busy.length > 0 || _m._range.empty); 369 } 370 do { 371 if ( !_m._result.isNull ) { 372 return _m._result.get; 373 } 374 Tid w; 375 sendWhilePossible(); 376 receive( 377 (Tid t, Result r) { 378 debug(requests) trace("received first response"); 379 _m._result = r; 380 // move this thread from busy to idle threads 381 fromBusyToIdle(t); 382 _m._received++; 383 w = t; 384 }, 385 ); 386 if ( !_m._range.empty && _m._busy.length == 0) { 387 // when max number of workers = 1, then 388 // at this point we will have only one idle worker, 389 // and we need to have at least one busy worker 390 // so that we can always read in popFront 391 Job j = _m._range.front(); 392 Route route = Route(j._url); 393 w.send(thisTid(), j); 394 _m._range.popFront(); 395 fromIdleToBusy(w, route); 396 _m._sent++; 397 } 398 return _m._result.get; 399 } 400 /** 401 helpers 402 */ 403 void fromBusyToIdle(Tid t) { 404 assert(t in _m._busy); 405 assert(t !in _m._idle); 406 _m._idle[t] = _m._busy[t]; 407 _m._busy.remove(t); 408 } 409 void fromIdleToBusy(Tid t, Route r) { 410 assert(t !in _m._busy); 411 assert(t in _m._idle); 412 _m._busy[t] = r; 413 _m._box[t] = Job.init; 414 _m._idle.remove(t); 415 } 416 void sendWhilePossible() { 417 while( !_m._range.empty() && (_m._busy.length+_m._idle.length) < _m._workers) { 418 Tid t = spawn(&worker); 419 Job j = _m._range.front(); 420 Route route = Route(j._url); 421 422 auto owner = thisTid(); 423 send(t, owner, j); 424 _m._range.popFront(); 425 _m._busy[t] = route; 426 _m._box[t] = Job.init; 427 _m._sent++; 428 } 429 } 430 } 431 432 Pool!R pool(R)(R r, uint w) { 433 enforce(w>0, "Number of workers must me > 0"); 434 return Pool!R(r, w); 435 } 436 437 version(None) unittest { 438 439 version(vibeD) { 440 string httpbinurl = "http://httpbin.org"; 441 } else { 442 info("Testing pool"); 443 import httpbin; 444 auto server = httpbinApp(); 445 server.start(); 446 scope(exit) { 447 server.stop(); 448 } 449 Thread.sleep(1.seconds); 450 globalLogLevel = LogLevel.trace; 451 string httpbinurl = "http://127.0.0.1:8081"; 452 Job[] jobs = [ 453 Job(httpbinurl ~ "/get").addHeaders([ 454 "X-Header": "X-Value", 455 "Y-Header": "Y-Value" 456 ]), 457 Job(httpbinurl ~ "/gzip"), 458 Job(httpbinurl ~ "/deflate"), 459 Job(httpbinurl ~ "/absolute-redirect/3") 460 .maxRedirects(2), 461 Job(httpbinurl ~ "/range/1024"), 462 Job(httpbinurl ~ "/post") 463 .method("POST") // change default GET to POST 464 .data("test".representation()) // attach data for POST 465 .opaque("id".representation), // opaque data - you will receive the same in Result 466 Job(httpbinurl ~ "/delay/3") 467 .timeout(1.seconds), // set timeout to 1.seconds - this request will throw exception and fails 468 Job(httpbinurl ~ "/stream/1024"), 469 ]; 470 471 auto count = jobs. 472 pool(6). 473 filter!(r => r.code==200). 474 count(); 475 476 assert(count == jobs.length - 2, "pool test failed"); 477 iota(20) 478 .map!(n => Job(httpbinurl ~ "/post") 479 .data("%d".format(n).representation)) 480 .pool(10) 481 .each!(r => assert(r.code==200)); 482 info("Testing pool - done"); 483 } 484 }