The OpenD Programming Language

1 /++
2 	Fiber-based socket i/o built on Phobos' std.socket and Socket.select without any other dependencies.
3 
4 
5 	This is meant to be a single-threaded event-driven basic network server.
6 
7 	---
8 	void main() {
9 		auto fm = new FiberManager();
10 		// little tcp echo server
11 		// exits when it gets "QUIT" on the socket.
12 		Socket listener;
13 		listener = fm.listenTcp6(6660, (Socket conn) {
14 			while(true) {
15 				char[128] buffer;
16 				auto ret = conn.receive(buffer[]);
17 				// keeps the Phobos interface so...
18 				if(ret <= 0) // ...still need to check return values
19 					break;
20 				auto got = buffer[0 .. ret];
21 				if(got.length >= 4 && got[0 .. 4] == "QUIT") {
22 					listener.close();
23 					break;
24 				} else {
25 					conn.send(got);
26 				}
27 			}
28 			conn.close();
29 		});
30 
31 		// simultaneously listen for and echo UDP packets
32 		fm.makeFiber( () {
33 			auto sock = fm.bindUdp4(9999);
34 			char[128] buffer;
35 			Address addr;
36 			while(true) {
37 				auto ret = sock.receiveFrom(buffer[], addr);
38 				if(ret <= 0)
39 					break;
40 				import std.stdio;
41 				auto got = buffer[0 .. ret];
42 				// print it to the console
43 				writeln("Received UDP ", got);
44 				// send the echo
45 				sock.sendTo(got, addr);
46 
47 				if(got.length > 4 && got[0 .. 4] == "QUIT") {
48 					break; // stop processing udp when told to quit too
49 				}
50 			}
51 		}).call(); // need to call it the first time ourselves to get it started
52 
53 		// run the events. This keeps going until there are no more registered events;
54 		// so when all registered sockets are closed or abandoned.
55 		//
56 		// So this will return when both QUIT messages are received and all clients disconnect.
57 		import std.stdio;
58 		writeln("Entering.");
59 
60 		fm.run();
61 
62 		writeln("Exiting.");
63 	}
64 	---
65 
66 	Note that DNS address lookups here may still block the whole thread, but other methods on `Socket` are overridden in the subclass ([FiberSocket]) to `yield` appropriately, so you should be able to reuse most existing code that uses Phobos' Socket with little to no modification. However, since it keeps the same interface as the original object, remember you still need to check your return values!
67 
68 	There's two big differences:
69 
70 	$(NUMBERED_LIST
71 		* You should not modify the `blocking` flag on the Sockets. It is already set for you and changing it will... probably not hurt, but definitely won't help.
72 
73 		* You shouldn't construct the Sockets yourself, nor call `connect` or `listen` on them. Instead, use the methods in the [FiberManager] class. It will ensure you get the right objects initialized in the right way with the minimum amount of blocking.
74 
75 		The `listen` family of functions accept a delegate that is called per each connection in a fresh fiber. The `connect` family of functions can only be used from inside an existing fiber - if you do it in a connection handler from listening, it is already set up. If it is from your main thread though, you'll get an assert error unless you make your own fiber ahead of time. [FiberManager.makeFiber] can construct one for you, or you can call `new Fiber(...)` from `import core.thread.fiber` yourself. Put all the work with the connection inside that fiber so the manager can do its work most efficiently.
76 	)
77 
78 	There's several convenience functions to construct addresses for you too, or you may simply do `getAddress` or `new InternetAddress` and friends from `std.socket` yourself.
79 
80 	$(H2 Conceptual Overview)
81 
82 	A socket is a common programming object for communication over a network. Phobos has support for the basics and you can read more about that in my blog socket tutorial: http://dpldocs.info/this-week-in-d/Blog.Posted_2019_11_11.html
83 
84 	A lot of things describe [core.thread.fiber.Fiber|fibers] as lightweight threads, and that's not wrong, but I think that actually overcomplicates them. I prefer to think of a fiber as a function that can pause itself. You call it like a function, you write it like a function, but instead of always completing and returning, it can [core.thread.fiber.Fiber.yield|yield], which is putting itself on pause and returning to the caller. The caller then has a chance to resume the function when it chooses to simply by [core.thread.fiber.Fiber.call|calling] it again, and it picks up where it left off, or the caller can [core.thread.fiber.Fiber.reset|reset] the fiber function to the beginning and start over.
85 
86 	Fiber-based async i/o thus isn't as complicated as it sounds. The basic idea is you just write an ordinary function in the same style as if you were doing linear, blocking i/o calls, but instead of actually blocking, you register a callback to be woken up when the call can succeed, then yield yourself. This callback you register is simply your own fiber resume method; the event loop picks up where you left off.
87 
88 	With Phobos sockets (and most Unix i/o functions), you then retry the operation that would have blocked and carry on because the callback is triggered when the operation is ready. If you're using another async system, like Windows' Overlapped I/O callbacks, it is actually even easier, since that callback happens when the operation has already completed. In those cases, you register the fiber's resume function as the event callback, then yield. When you wake up, you can immediately carry on.
89 
90 	When a fiber is woken up, it continues executing from the last `yield` call. Just think of `yield` as being a pause button you press.
91 
92 	Understanding how it works means you can translate any callback-based i/o system to use fibers, since it would always follow that same pattern: register the fiber resume method, then yield. If it is a callback when the operation is ready, try it again when you wake up (so right after yield, you can loop back to the call), or if it is a callback when the operation is complete, you can immediately use the result when you wake up (so right after yield, you use it).
93 
94 	How does the event loop work? How do you know what fiber runs next? See, this is where the "lightweight thread" explanation complicates things. With a thread, the operating system is responsible for scheduling them and might even run several simultaneously. Fibers are much simpler: again, think of them as just being a function that can pause itself. Like with an ordinary function, just one runs at a time (in your thread anyway, of course adding threads can complicate fibers like it can complicate any other function). Like with an ordinary function, YOU choose which one you want to call and when. And when a fiber `yield`s, it is very much like an ordinary function `return`ing - it passes control back to you, the caller. The only difference is the Fiber object remembers where the function was when it yielded, so you can ask it to pick up where it left off.
95 
96 	The event loop therefore doesn't look all that special. If you've used `Socket.select` before, you'll recognize most of it. (`select` can be tricky to use though, `epoll` based code is actually simpler and more efficient... but this module only wanted to use Phobos' std.socket on its own. Besides, `select` still isn't that complicated, is cross-platform, and performs well enough for most tasks anyway.) It has a list of active sockets that it adds to either a read or write set, it calls the select function, then it loops back over and handles the events, if set. The only special thing is the event handler resumes the fiber instead of some other action.
97 
98 	I encourage you to view the source of this file and try to follow along. It isn't terribly long and can hopefully help to introduce you to a new world of possibilities. You can use Fibers in other cases too, for example, the game I'm working on uses them in enemy scripts. It sets up their action, then yields and lets the player take their turn. When it is the computer's turn again, the script fiber resumes. Same principle, simple code once you get to know it.
99 
100 	$(H2 Limitations)
101 	`Socket.select` has a limit on the number of pending sockets at any time, and since you have to loop through them each iteration, it can get slow with huge numbers of concurrent connections. I'd note that you probably will not see this problem, but it certainly can happen. Similarly, there's `new` allocations for each socket and virtual calls throughout, which, again, probably will be good enough for you, but this module is not C10K+ "web scale".
102 
103 	It also cannot be combined with other event loops in the same thread. But, since the [FiberManager] only uses the thread you give it, you might consider running it here and other things along side in their own threads.
104 
105 	Credits:
106 		vibe.d is the first time I recall even hearing of fibers and is the direct inspiration for this.
107 
108 	History:
109 		Written December 26, 2020. First included in arsd-official dub release 9.1.
110 
111 	License:
112 		BSL-1.0, same as Phobos
113 +/
114 module arsd.fibersocket; // previously known as "centivibe" since it provides like 1/100th the functionality of vibe.d
115 
116 public import std.socket;
117 import core.thread.fiber;
118 
119 /// just because I forget how to enable this, trivial helper function
120 void allowBroadcast(Socket socket) {
121 	socket.setOption(SocketOptionLevel.SOCKET, SocketOption.BROADCAST, 1);
122 }
123 
124 /// Convenience function to loop and send until it it all sent or an error occurs.
125 ptrdiff_t sendAll(Socket s, scope const(void)[] data) {
126 	auto ol = data.length;
127 	while(data.length) {
128 		auto ret = s.send(data);
129 		if(ret <= 0)
130 			return ret;
131 		data = data[ret .. $];
132 	}
133 	return ol;
134 }
135 
136 /++
137 	Subclass of Phobos' socket that basically works the same way, except it yields back to the [FiberManager] when it would have blocked.
138 
139 	You should not modify the `blocking` flag on these and generally not construct them, connect them, or listen on them yourself (let [FiberManager] do the setup for you), but otherwise they work the same as the original Phobos [std.socket.Socket] and implement the very same interface. You can call the exact same functions with original Sockets or FiberSockets.
140 +/
141 class FiberSocket : Socket {
142 	enum PendingOperation {
143 		none, read, write
144 	}
145 
146 	protected this(FiberManager fm) pure nothrow @safe {
147 		this.fm = fm;
148 		super();
149 	}
150 
151 	/// You should probably call the helper functions in [FiberManager] instead.
152 	this(FiberManager fm, AddressFamily af, SocketType st, Fiber fiber) {
153 		assert(fm !is null);
154 
155 		this.fm = fm;
156 		this.fiber = fiber;
157 		super(af, st);
158 		this.blocking = false;
159 	}
160 
161 	void callFiber() {
162 		fiber.call();
163 	}
164 
165 	private FiberManager fm;
166 	private Fiber fiber;
167 	private PendingOperation pendingOperation;
168 
169 	private void queue(PendingOperation op) @trusted nothrow {
170 		pendingOperation = op;
171 		fm.pendingSockets ~= this;
172 		fiber.yield();
173 	}
174 
175 	protected override Socket accepting() pure nothrow {
176 		return new FiberSocket(fm);
177 	}
178 
179 	private ptrdiff_t magic(scope ptrdiff_t delegate() @safe what, PendingOperation op) @trusted {
180 		try_again:
181 		auto r = what();
182 		if(r == -1 && wouldHaveBlocked()) {
183 			queue(op);
184 			goto try_again;
185 		}
186 		return r;
187 	}
188 
189 	/// Yielding override of the Phobos interface
190 	override ptrdiff_t send(scope const(void)[] buf, SocketFlags flags) {
191 		return magic( () { return super.send(buf, flags); }, PendingOperation.write);
192 	}
193 	/// ditto
194 	override ptrdiff_t receive(scope void[] buf, SocketFlags flags) {
195 		return magic( () { return super.receive(buf, flags); }, PendingOperation.read);
196 	}
197 
198 	/// ditto
199 	override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags, ref Address from) @trusted {
200 		return magic( () { return super.receiveFrom(buf, flags, from); }, PendingOperation.read);
201 	}
202 	/// ditto
203 	override ptrdiff_t receiveFrom(scope void[] buf, SocketFlags flags) @trusted {
204 		return magic( () { return super.receiveFrom(buf, flags); }, PendingOperation.read);
205 	}
206 	/// ditto
207 	override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags, Address to) @trusted {
208 		return magic( () { return super.sendTo(buf, flags, to); }, PendingOperation.write);
209 	}
210 	/// ditto
211 	override ptrdiff_t sendTo(scope const(void)[] buf, SocketFlags flags) @trusted {
212 		return magic( () { return super.sendTo(buf, flags); }, PendingOperation.write);
213 	}
214 
215 	// lol overload sets
216 	/// The Phobos overloads are still available too, they forward to the overrides in this class and thus work the same way.
217 	alias send = typeof(super).send;
218 	/// ditto
219 	alias receive = typeof(super).receive;
220 	/// ditto
221 	alias sendTo = typeof(super).sendTo;
222 	/// ditto
223 	alias receiveFrom = typeof(super).receiveFrom;
224 }
225 
226 /++
227 	The FiberManager is responsible for running your socket event loop and dispatching events to your fibers. It is your main point of interaction with this library.
228 
229 	Generally, a `FiberManager` will exist in your `main` function and take over that thread when you call [run]. You construct one, set up your listeners, etc., then call `run` and let it do its thing.
230 +/
231 class FiberManager {
232 	private FiberSocket[] pendingSockets;
233 
234 	private size_t defaultFiberStackSize;
235 
236 	/++
237 		Params:
238 			defaultFiberStackSize = size, in bytes, of the fiber stacks [makeFiber] returns. If 0 (the default), use the druntime default.
239 	+/
240 	this(size_t defaultFiberStackSize = 0) {
241 		this.defaultFiberStackSize = defaultFiberStackSize;
242 	}
243 
244 	/++
245 		Convenience function to make a worker fiber based on the manager's configuration.
246 
247 		This is used internally when connections come in.
248 	+/
249 	public Fiber makeFiber(void delegate() fn) {
250 		return defaultFiberStackSize ? new Fiber(fn, defaultFiberStackSize) : new Fiber(fn);
251 	}
252 
253 	/++
254 		Convenience functions for creating listening sockets. These are trivial forwarders to [listenStream], constructing the appropriate [std.socket.Address] object for you. Note the address lookup does NOT at this time use the fiber io and may thus block your thread.
255 
256 		You can `close` the returned socket when you want to stop listening, or just ignore it if you want to listen for the whole duration of the program.
257 	+/
258 	final Socket listenTcp6(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
259 		return listenStream(new Internet6Address(port), connectionHandler, backlog);
260 	}
261 
262 	/// ditto
263 	final Socket listenTcp6(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
264 		return listenStream(new Internet6Address(address, port), connectionHandler, backlog);
265 	}
266 
267 	/// ditto
268 	final Socket listenTcp4(ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
269 		return listenStream(new InternetAddress(port), connectionHandler, backlog);
270 	}
271 
272 	/// ditto
273 	final Socket listenTcp4(string address, ushort port, void delegate(Socket) connectionHandler, int backlog = 8) {
274 		return listenStream(new InternetAddress(address, port), connectionHandler, backlog);
275 	}
276 
277 	/// ditto
278 	version(Posix)
279 	final Socket listenUnix(string path, void delegate(Socket) connectionHandler, int backlog = 8) {
280 		return listenStream(new UnixAddress(path), connectionHandler, backlog);
281 	}
282 
283 	/++
284 		Core listen function for streaming connection-oriented sockets (TCP, etc.)
285 
286 
287 		It will:
288 
289 		$(LIST
290 			* Create a [FiberSocket]
291 			* Create fibers on it for each incoming connection which call your `connectionHandler`
292 			* Bind to the given `Address`
293 			* Call `socket.listen(backlog)`
294 			* Start `accept`ing connections.
295 		)
296 
297 		Returns: the listening socket. You shouldn't do much with this except maybe `close` it when you are done.
298 	+/
299 	Socket listenStream(Address addr, void delegate(Socket) connectionHandler, int backlog) {
300 		assert(connectionHandler !is null, "null connectionHandler passed to a listenTcp function");
301 
302 		FiberSocket socket;
303 
304 		socket = new FiberSocket(this, addr.addressFamily, SocketType.STREAM, makeFiber(
305 			delegate() {
306 				while(socket.isAlive()) {
307 					socket.queue(FiberSocket.PendingOperation.read); // put fiber on hold until ready to accept
308 
309 					auto ns = cast(FiberSocket) socket.accept();
310 					ns.blocking = false;
311 					ns.fiber = makeFiber(delegate() {
312 						connectionHandler(ns);
313 					});
314 					// need to get the new connection started
315 					ns.fiber.call();
316 				}
317 			}
318 		));
319 		socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
320 		socket.bind(addr);
321 		socket.blocking = false;
322 		socket.listen(backlog);
323 
324 		socket.callFiber();
325 
326 		return socket;
327 	}
328 
329 	/++
330 		Convenience functions that forward to [connectStream] for the given protocol. They connect, send, and receive in an async manner, but do not create their own fibers - you must already be in one when you call this function.
331 
332 
333 		Connections only work if you are already in a fiber. This is the case in a connectionHandler, but not from your main function. You'll have to make your own worker fiber. (But tbh if you only have one connection anyway, you might as well use a standard Socket.)
334 
335 		If you are already in a connection handler set in the listen family of functions, you're all set - those are automatically in fibers. If you are in main though, you need to make a worker fiber.
336 
337 		Making a worker fiber is simple enough. You can do it with `new Fiber` or with [FiberManager.makeFiber] (the latter just calls the former with a size argument set up in the FiberManager constructor).
338 
339 		---
340 		auto fm = new FiberManager();
341 		fm.makeFiber(() {
342 			auto socket = fm.connectTcp4(...);
343 
344 			socket.send(...);
345 		}).call(); // you must call it the first time yourself so it self-registers
346 		---
347 
348 		OR
349 
350 		---
351 		import core.thread.fiber;
352 
353 		auto fiber = new Fiber(() {
354 			auto socket = fm.connectTcp4(...);
355 			// do stuff in here
356 		}).call(); // same deal, still need to call it the first time yourself to give it a chance to self-register
357 		---
358 	+/
359 	final Socket connectTcp4(string address, ushort port) {
360 		return connectStream(new InternetAddress(address, port));
361 	}
362 
363 	/// ditto
364 	final Socket connectTcp6(string address, ushort port) {
365 		return connectStream(new Internet6Address(address, port));
366 	}
367 
368 	/// ditto
369 	version(Posix)
370 	final Socket connectUnix(string path) {
371 		return connectStream(new UnixAddress(path));
372 	}
373 
374 	/++
375 		Connects a streaming socket to the given address that will yield to this FiberManager instead of blocking.
376 
377 	+/
378 	Socket connectStream(Address address) {
379 		assert(Fiber.getThis !is null, "connect functions can only be used from inside preexisting fibers");
380 		FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.STREAM, Fiber.getThis);
381 		socket.connect(address);
382 		socket.queue(FiberSocket.PendingOperation.write); // wait for it to connect
383 		scope(failure)
384 			socket.close();
385 		// and ensure the connection was successful before proceeding
386 		int result;
387 		if(socket.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, result) < 0)
388 			throw new Exception("get socket error failed");
389 		if(result != 0)
390 			throw new Exception("Connect failed");
391 		return socket;
392 	}
393 
394 	/++
395 		These are convenience functions that forward to [bindDatagram].
396 
397 		UDP sockets don't connect per se, but the basically work the same as [connectStream]. See the caveat about requiring a premade Fiber from that page.
398 	+/
399 	Socket bindUdp4(string address, ushort port) {
400 		return bindDatagram(new InternetAddress(address, port));
401 	}
402 	/// ditto
403 	Socket bindUdp4(ushort port) {
404 		return bindDatagram(new InternetAddress(port));
405 	}
406 	/// ditto
407 	Socket bindUdp6(string address, ushort port) {
408 		return bindDatagram(new Internet6Address(address, port));
409 	}
410 	/// ditto
411 	Socket bindUdp6(ushort port) {
412 		return bindDatagram(new Internet6Address(port));
413 	}
414 
415 	/++
416 		Only valid from inside a worker fiber, see [makeFiber].
417 
418 		---
419 		fm.makeFiber(() {
420 			auto sock = fm.bindDatagram(new InternetAddress(5555));
421 			sock.receiveFrom(....);
422 		}).call(); // remember to call it the first time or it will never start!
423 	+/
424 	Socket bindDatagram(Address address) {
425 		assert(Fiber.getThis !is null, "bind datagram functions can only be used from inside preexisting fibers");
426 		FiberSocket socket = new FiberSocket(this, address.addressFamily, SocketType.DGRAM, Fiber.getThis);
427 		socket.bind(address);
428 		return socket;
429 	}
430 
431 	/++
432 		Runs the program and manages the fibers and connections for you, calling the appropriate functions when new events arrive.
433 
434 		Returns when no connections are left open.
435 	+/
436 	void run() {
437 		auto readSet = new SocketSet;
438 		auto writeSet = new SocketSet;
439 		while(true) {
440 			readSet.reset();
441 			writeSet.reset();
442 			int added;
443 			for(int idx = 0; idx < pendingSockets.length; idx++) {
444 				auto pending = pendingSockets[idx];
445 				if(!pending.isAlive()) {
446 					// order not important here since we haven't done any real work yet
447 					// really it shouldn't even be on the list.
448 					pendingSockets[idx] = pendingSockets[$-1];
449 					pendingSockets = pendingSockets[0 .. $-1];
450 					pendingSockets.assumeSafeAppend();
451 					idx--;
452 					continue;
453 				}
454 				final switch(pending.pendingOperation) {
455 					case FiberSocket.PendingOperation.none:
456 						assert(0); // why is this object on this list?!
457 					case FiberSocket.PendingOperation.write:
458 						writeSet.add(pending);
459 						added++;
460 						break;
461 					case FiberSocket.PendingOperation.read:
462 						readSet.add(pending);
463 						added++;
464 						break;
465 				}
466 			}
467 			if(added == 0)
468 				return; // no work to do, all connections closed
469 			auto eventCount = Socket.select(readSet, writeSet, null);//, 5.seconds);
470 			if(eventCount == -1)
471 				continue;
472 			for(int idx = 0; idx < pendingSockets.length && eventCount > 0; idx++) {
473 				auto pending = pendingSockets[idx];
474 				SocketSet toCheck;
475 				final switch(pending.pendingOperation) {
476 					case FiberSocket.PendingOperation.none:
477 						break;
478 					case FiberSocket.PendingOperation.write:
479 						toCheck = writeSet;
480 						break;
481 					case FiberSocket.PendingOperation.read:
482 						toCheck = readSet;
483 						break;
484 				}
485 				if(toCheck is null)
486 					continue;
487 
488 				if(toCheck.isSet(pending)) {
489 					eventCount--;
490 					import std.algorithm.mutation;
491 					// the order is fairly important since previous calls can append to
492 					// this again, and we want to be sure we process the ones in this batch
493 					// before seeing anything from the next batch.
494 					pendingSockets = remove!(SwapStrategy.stable)(pendingSockets, idx);
495 					pendingSockets.assumeSafeAppend();
496 					idx--; // the slot we used to have is now different, so it needs to be reprocessed
497 					pending.fiber.call();
498 				}
499 			}
500 		}
501 	}
502 }