1 module requests.connmanager; 2 3 import std.typecons; 4 import std.datetime; 5 import std.array; 6 import std.algorithm; 7 import std.exception; 8 9 import std.experimental.logger; 10 11 import requests.streams; 12 13 import cachetools.cache; 14 15 /** 16 * Keep opened connections for HTTP. 17 * It is actually cache over tuple(schema, host, port) -> connection 18 * with limited number of items. 19 * 20 * Evict least used. 21 */ 22 package struct ConnManager { 23 package alias CMKey = Tuple!(string, string, ushort); 24 package struct CMValue { 25 NetworkStream stream; 26 SysTime timestamp; 27 } 28 private { 29 CacheLRU!(CMKey, CMValue) __cache; 30 } 31 this(int limit) { 32 __cache = new CacheLRU!(CMKey, CMValue); 33 __cache.size = limit; 34 __cache.enableCacheEvents(); 35 } 36 ~this() { 37 clear(); 38 } 39 @property auto length() { 40 return __cache.length; 41 } 42 /// 43 /// put new stream in cache, evict old stream and return it. 44 /// If nothing evicted return null. Returned(evicted) connection can be 45 /// closed. 46 /// 47 NetworkStream put(string schema, string host, ushort port, NetworkStream stream) 48 in { assert(stream !is null);} 49 out{ assert(__cache.length>0);} 50 do { 51 NetworkStream e; 52 CMKey key = CMKey(schema, host, port); 53 CMValue value = {stream: stream, timestamp: Clock.currTime}; 54 __cache.put(key, value); 55 auto cacheEvents = __cache.cacheEvents(); 56 switch( cacheEvents.length ) 57 { 58 case 0: 59 return null; 60 case 1: 61 return cacheEvents.front.val.stream; 62 default: 63 assert(0); 64 } 65 } 66 /** 67 Lookup connection. 68 */ 69 NetworkStream get(string schema, string host, ushort port) 70 do 71 { 72 if ( __cache is null ) return null; 73 auto v = __cache.get(CMKey(schema, host, port)); 74 if ( ! v.isNull() ) 75 { 76 return v.get.stream; 77 } 78 return null; 79 } 80 81 /** 82 Remove connection from cache (without close). 83 */ 84 NetworkStream del(string schema, string host, ushort port) { 85 NetworkStream s; 86 CMKey key = CMKey(schema, host, port); 87 __cache.remove(key); 88 auto cacheEvents = __cache.cacheEvents(); 89 switch( cacheEvents.length ) 90 { 91 case 0: 92 return null; 93 case 1: 94 return cacheEvents.front.val.stream; 95 default: 96 assert(0); 97 } 98 } 99 100 /** 101 clear cache (and close connections) 102 */ 103 void clear() 104 out { assert(__cache is null || __cache.length == 0); } 105 do { 106 if ( __cache is null ) return; 107 108 __cache.clear(); 109 foreach(e; __cache.cacheEvents ) 110 { 111 try 112 { 113 e.val.stream.close(); 114 } 115 catch(Exception e) 116 { 117 debug(requests) tracef("%s while clear connmanager", e.msg); 118 } 119 } 120 __cache = null; 121 } 122 } 123 124 unittest { 125 globalLogLevel = LogLevel.info; 126 ConnManager cm = ConnManager(2); 127 auto s0 = new TCPStream(); 128 auto s1 = new TCPStream(); 129 auto s2 = new TCPStream(); 130 131 auto e = cm.put("http", "s0", 1, s0); 132 assert(e is null); 133 assert(cm.get("http", "s0", 1) == s0); 134 135 e = cm.put("http", "s1", 1, s1); 136 assert(e is null); 137 assert(cm.get("http", "s1", 1) == s1); 138 139 e = cm.put("http", "s2", 1, s2); 140 assert(e !is null); 141 assert(cm.get("http", "s2", 1) == s2); 142 assert(e == s0); // oldest 143 e.close(); 144 145 // at this moment we have s1, s2 146 // let try to update s1 147 auto s3 = new TCPStream; 148 e = cm.put("http", "s1", 1, s3); 149 assert(e == s1); 150 e.close(); 151 assert(cm.get("http", "s1", 1) == s3); 152 153 cm.clear(); 154 assert(cm.get("http", "s1", 1) is null); 155 }