The OpenD Programming Language

1 /// I never finished this. The idea is to use CT reflection to make calling another process feel as simple as calling in-process objects. Will come eventually but no promises.
2 module arsd.rpc;
3 
4 /*
5 	FIXME:
6 		1) integrate with arsd.eventloop
7 		2) make it easy to use with other processes; pipe to a process and talk to it that way. perhaps with shared memory too?
8 		3) extend the serialization capabilities
9 
10 
11 	@Throws!(List, Of, Exceptions)
12 	classes are also RPC proxied
13 	stdin/out/err also redirected
14 */
15 
16 ///+ //example usage
17 interface ExampleNetworkFunctions {
18 	string sayHello(string name);
19 	int add(int a, int b);
20 	S2 structTest(S1);
21 	void die();
22 }
23 
24 // the server must implement the interface
25 class ExampleServer : ExampleNetworkFunctions {
26 	override string sayHello(string name) {
27 		return "Hello, " ~ name;
28 	}
29 
30 	override int add(int a, int b) {
31 		return a+b;
32 	}
33 
34 	override S2 structTest(S1 a) {
35 		return S2(a.name, a.number);
36 	}
37 
38 	override void die() {
39 		throw new Exception("death requested");
40 	}
41 
42 	mixin NetworkServer!ExampleNetworkFunctions;
43 }
44 
45 struct S1 {
46 	int number;
47 	string name;
48 }
49 
50 struct S2 {
51 	string name;
52 	int number;
53 }
54 
55 import std.stdio;
56 void main(string[] args) {
57 	if(args.length > 1) {
58 		auto client = makeNetworkClient!ExampleNetworkFunctions("localhost", 5005);
59 		// these work like the interface above, but instead of returning the value,
60 		// they take callbacks for success (where the arg is the retval)
61 		// and failure (the arg is the exception)
62 		client.sayHello("whoa", (a) { writeln(a); }, null);
63 		client.add(1,2, (a) { writeln(a); }, null);
64 		client.add(10,20, (a) { writeln(a); }, null);
65 		client.structTest(S1(20, "cool!"), (a) { writeln(a.name, " -- ", a.number); }, null);
66 		client.die(delegate () { writeln("shouldn't happen"); }, delegate(a) { writeln(a); });
67 		client.eventLoop();
68 
69 		/*
70 		auto client = makeNetworkClient!(ExampleNetworkFunctions, false)("localhost", 5005);
71 		writeln(client.sayHello("whoa"));
72 		writeln(client.add(1, 2));
73 		client.die();
74 		writeln(client.add(1, 2));
75 		*/
76 	} else {
77 		auto server = new ExampleServer(5005);
78 		server.eventLoop();
79 	}
80 }
81 //+/
82 
83 mixin template NetworkServer(Interface) {
84 	import std.socket;
85 	private Socket socket;
86 	public this(ushort port) {
87 		socket = new TcpSocket();
88 		socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
89 		socket.bind(new InternetAddress(port));
90 		socket.listen(16);
91 	}
92 
93 	final public void eventLoop() {
94 		auto check = new SocketSet();
95 		Socket[] connections;
96 		connections.reserve(16);
97 		ubyte[4096] buffer;
98 
99 		while(true) {
100 			check.reset();
101 			check.add(socket);
102 			foreach(connection; connections) {
103 				check.add(connection);
104 			}
105 
106 			if(Socket.select(check, null, null)) {
107 				if(check.isSet(socket)) {
108 					connections ~= socket.accept();
109 				}
110 
111 				foreach(connection; connections) {
112 					if(check.isSet(connection)) {
113 						auto gotNum = connection.receive(buffer);
114 						if(gotNum == 0) {
115 							// connection is closed, we could remove it from the list
116 						} else {
117 							auto got = buffer[0 .. gotNum];
118 							another:
119 							int length, functionNumber, sequenceNumber;
120 							got = deserializeInto(got, length);
121 							got = deserializeInto(got, functionNumber);
122 							got = deserializeInto(got, sequenceNumber);
123 
124 							//writeln("got ", sequenceNumber, " calling ", functionNumber);
125 
126 							auto remaining = got[length .. $];
127 							got = got[0 .. length];
128 							import std.conv;
129 							assert(length == got.length, to!string(length) ~ " != " ~ to!string(got.length)); // FIXME: what if it doesn't all come at once?
130 							callByNumber(functionNumber, sequenceNumber, got, connection);
131 
132 							if(remaining.length) {
133 								got = remaining;
134 								goto another;
135 							}
136 						}
137 					}
138 				}
139 			}
140 		}
141 	}
142 
143 	final private void callByNumber(int functionNumber, int sequenceNumber, const(ubyte)[] buffer, Socket connection) {
144 		ubyte[4096] sendBuffer;
145 		int length = 12;
146 		// length, sequence, success
147 		serialize(sendBuffer[4 .. 8], sequenceNumber);
148 		string callCode() {
149 			import std.conv;
150 			import std.traits;
151 			string code;
152 			foreach(memIdx, member; __traits(allMembers, Interface)) {
153 				code ~= "\t\tcase " ~ to!string(memIdx + 1) ~ ":\n";
154 				alias mem = PassThrough!(__traits(getMember, Interface, member));
155 				// we need to deserialize the arguments, call the function, and send back the response (if there is one)
156 				string argsString;
157 				foreach(i, arg; ParameterTypeTuple!mem) {
158 					if(i)
159 						argsString ~= ", ";
160 					auto istr = to!string(i);
161 					code ~= "\t\t\t" ~ arg.stringof ~ " arg" ~ istr ~ ";\n";
162 					code ~= "\t\t\tbuffer = deserializeInto(buffer, arg" ~ istr ~ ");\n";
163 
164 					argsString ~= "arg" ~ istr;
165 				}
166 
167 				// the call
168 				static if(is(ReturnType!mem == void)) {
169 					code ~= "\n\t\t\t" ~ member ~ "(" ~ argsString ~ ");\n";
170 				} else {
171 					// call and return answer
172 					code ~= "\n\t\t\tauto ret = " ~ member ~ "(" ~ argsString ~ ");\n";
173 
174 					code ~= "\t\t\tserialize(sendBuffer[8 .. 12], cast(int) 1);\n"; // yes success
175 					code ~= "\t\t\tauto serialized = serialize(sendBuffer[12 .. $], ret);\n";
176 					code ~= "\t\t\tserialize(sendBuffer[0 .. 4], cast(int) serialized.length);\n";
177 					code ~= "\t\t\tlength += serialized.length;\n";
178 				}
179 				code ~= "\t\tbreak;\n";
180 			}
181 			return code;
182 		}
183 
184 		try {
185 			switch(functionNumber) {
186 				default: assert(0, "unknown function");
187 				//pragma(msg, callCode());
188 				mixin(callCode());
189 			}
190 		} catch(Throwable t) {
191 			//writeln("thrown: ", t);
192 			serialize(sendBuffer[8 .. 12], cast(int) 0); // no success
193 
194 			auto place = sendBuffer[12 .. $];
195 			int l;
196 			auto s = serialize(place, t.msg);
197 			place = place[s.length .. $];
198 			l += s.length;
199 			s = serialize(place, t.file);
200 			place = place[s.length .. $];
201 			l += s.length;
202 			s = serialize(place, t.line);
203 			place = place[s.length .. $];
204 			l += s.length;
205 
206 			serialize(sendBuffer[0 .. 4], l);
207 			length += l;
208 		}
209 
210 		if(length != 12) // if there is a response...
211 			connection.send(sendBuffer[0 .. length]);
212 	}
213 }
214 
215 template PassThrough(alias a) {
216 	alias PassThrough = a;
217 }
218 
219 // general FIXME: what if we run out of buffer space?
220 
221 // returns the part of the buffer that was actually used
222 final public ubyte[] serialize(T)(ubyte[] buffer, in T s) {
223 	auto original = buffer;
224 	size_t totalLength = 0;
225 	import std.traits;
226 	static if(isArray!T) {
227 		/* length */ {
228 			auto used = serialize(buffer, cast(int)  s.length);
229 			totalLength += used.length;
230 			buffer = buffer[used.length .. $];
231 		}
232 		foreach(i; s) {
233 			auto used = serialize(buffer, i);
234 			totalLength += used.length;
235 			buffer = buffer[used.length .. $];
236 		}
237 	} else static if(isPointer!T) {
238 		static assert(0, "no pointers allowed");
239 	} else static if(!hasIndirections!T) {
240 		// covers int, float, char, etc. most the builtins
241 		import std.string;
242 		assert(buffer.length >= T.sizeof, format("%s won't fit in %s buffer", T.stringof, buffer.length));
243 		buffer[0 .. T.sizeof] = (cast(ubyte*)&s)[0 .. T.sizeof];
244 		totalLength += T.sizeof;
245 		buffer = buffer[T.sizeof .. $];
246 	} else {
247 		// structs, classes, etc.
248 		foreach(i, t; s.tupleof) {
249 			auto used = serialize(buffer, t);
250 			totalLength += used.length;
251 			buffer = buffer[used.length .. $];
252 		}
253 	}
254 
255 	return original[0 .. totalLength];
256 }
257 
258 // returns the remaining part of the buffer
259 final public inout(ubyte)[] deserializeInto(T)(inout(ubyte)[] buffer, ref T s) {
260 	import std.traits;
261 
262 	static if(isArray!T) {
263 		size_t length;
264 		buffer = deserializeInto(buffer, length);
265 		s.length = length;
266 		foreach(i; 0 .. length)
267 			buffer = deserializeInto(buffer, s[i]);
268 	} else static if(isPointer!T) {
269 		static assert(0, "no pointers allowed");
270 	} else static if(!hasIndirections!T) {
271 		// covers int, float, char, etc. most the builtins
272 		(cast(ubyte*)(&s))[0 .. T.sizeof] = buffer[0 .. T.sizeof];
273 		buffer = buffer[T.sizeof .. $];
274 	} else {
275 		// structs, classes, etc.
276 		foreach(i, t; s.tupleof) {
277 			buffer = deserializeInto(buffer, s.tupleof[i]);
278 		}
279 	}
280 
281 	return buffer;
282 }
283 
284 mixin template NetworkClient(Interface, bool useAsync = true) {
285 	private static string createClass() {
286 		// this doesn't actually inherit from the interface because
287 		// the return value needs to be handled async
288 		string code;// = `final class Class /*: ` ~ Interface.stringof ~ `*/ {`;
289 		code ~= "\n\timport std.socket;";
290 		code ~= "\n\tprivate Socket socket;";
291 		if(useAsync) {
292 			code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onSuccesses;";
293 			code ~= "\n\tprivate void delegate(const(ubyte)[] buffer)[uint] onErrors;";
294 		}
295 		code ~= "\n\tprivate uint lastSequenceNumber;";
296 		code ~= q{
297 	private this(string host, ushort port) {
298 		this.socket = new TcpSocket();
299 		this.socket.connect(new InternetAddress(host, port));
300 	}
301 	};
302 
303 		if(useAsync)
304 		code ~= q{
305 	final public void eventLoop() {
306 		ubyte[4096] buffer;
307 		bool open = true;
308 
309 		do {
310 			auto gotNum = socket.receive(buffer);
311 			if(gotNum == 0) {
312 				open = false;
313 				break;
314 			}
315 			while(gotNum < 9) {
316 				auto g2 = socket.receive(buffer[gotNum .. $]);
317 				if(g2 == 0) {
318 					open = false;
319 					break;
320 				}
321 				gotNum += g2;
322 			}
323 
324 			auto got = buffer[0 .. gotNum];
325 			another:
326 			uint length, seq;
327 			uint success;
328 			got = deserializeInto(got, length);
329 			got = deserializeInto(got, seq);
330 			got = deserializeInto(got, success);
331 			auto more = got[length .. $];
332 
333 			if(got.length >= length) {
334 				if(success) {
335 					auto s = (seq in onSuccesses);
336 					if(s !is null && *s !is null)
337 						(*s)(got);
338 				} else {
339 					auto s = (seq in onErrors);
340 					if(s !is null && *s !is null)
341 						(*s)(got);
342 				}
343 			}
344 
345 			if(more.length) {
346 				got = more;
347 				goto another;
348 			}
349 		} while(open);
350 	}
351 	};
352 		code ~= "\n\tpublic:\n";
353 
354 		foreach(memIdx, member; __traits(allMembers, Interface)) {
355 			import std.traits;
356 			alias mem = PassThrough!(__traits(getMember, Interface, member));
357 			string type;
358 			if(useAsync)
359 				type = "void";
360 			else {
361 				static if(is(ReturnType!mem == void))
362 					type = "void";
363 				else
364 					type = (ReturnType!mem).stringof;
365 			}
366 			code ~= "\t\tfinal "~type~" " ~ member ~ "(";
367 			bool hadArgument = false;
368 			import std.conv;
369 			// arguments
370 			foreach(i, arg; ParameterTypeTuple!mem) {
371 				if(hadArgument)
372 					code ~= ", ";
373 				// FIXME: this is one place the arg can get unknown if we don't have all the imports
374 				code ~= arg.stringof ~ " arg" ~ to!string(i);
375 				hadArgument = true;
376 			}
377 
378 			if(useAsync) {
379 				if(hadArgument)
380 					code ~= ", ";
381 
382 				static if(is(ReturnType!mem == void))
383 					code ~= "void delegate() onSuccess";
384 				else
385 					code ~= "void delegate("~(ReturnType!mem).stringof~") onSuccess";
386 				code ~= ", ";
387 				code ~= "void delegate(Throwable) onError";
388 			}
389 			code ~= ") {\n";
390 			code ~= "auto seq = ++lastSequenceNumber;";
391 		if(useAsync)
392 		code ~= q{
393 			#line 252
394 			onSuccesses[seq] = (const(ubyte)[] buffer) {
395 				onSuccesses.remove(seq);
396 				onErrors.remove(seq);
397 
398 				import std.traits;
399 
400 				static if(is(ParameterTypeTuple!(typeof(onSuccess)) == void)) {
401 					if(onSuccess !is null)
402 						onSuccess();
403 				} else {
404 					ParameterTypeTuple!(typeof(onSuccess)) args;
405 					foreach(i, arg; args)
406 						buffer = deserializeInto(buffer, args[i]);
407 					if(onSuccess !is null)
408 						onSuccess(args);
409 				}
410 			};
411 			onErrors[seq] = (const(ubyte)[] buffer) {
412 				onSuccesses.remove(seq);
413 				onErrors.remove(seq);
414 				auto t = new Throwable("");
415 				buffer = deserializeInto(buffer, t.msg);
416 				buffer = deserializeInto(buffer, t.file);
417 				buffer = deserializeInto(buffer, t.line);
418 
419 				if(onError !is null)
420 					onError(t);
421 			};
422 		};
423 
424 		code ~= q{
425 			#line 283
426 			ubyte[4096] bufferBase;
427 			auto buffer = bufferBase[12 .. $]; // leaving room for size, func number, and seq number
428 			ubyte[] serialized;
429 			int used;
430 		};
431 			// preparing the request
432 			foreach(i, arg; ParameterTypeTuple!mem) {
433 				code ~= "\t\t\tserialized = serialize(buffer, arg" ~ to!string(i) ~ ");\n";
434 				code ~= "\t\t\tused += serialized.length;\n";
435 				code ~= "\t\t\tbuffer = buffer[serialized.length .. $];\n";
436 			}
437 
438 			code ~= "\t\t\tserialize(bufferBase[0 .. 4], used);\n";
439 			code ~= "\t\t\tserialize(bufferBase[4 .. 8], " ~ to!string(memIdx + 1) ~ ");\n";
440 			code ~= "\t\t\tserialize(bufferBase[8 .. 12], seq);\n";
441 
442 			// FIXME: what if it doesn't all send at once?
443 			code ~= "\t\t\tsocket.send(bufferBase[0 .. 12 + used]);\n";
444 			//code ~= `writeln("sending ", bufferBase[0 .. 12 + used]);`;
445 
446 		if(!useAsync)
447 		code ~= q{
448 			ubyte[4096] dbuffer;
449 			bool open = true;
450 			static if(is(typeof(return) == void)) {
451 
452 			} else
453 				typeof(return) returned;
454 
455 			auto gotNum = socket.receive(dbuffer);
456 			if(gotNum == 0) {
457 				open = false;
458 				throw new Exception("connection closed");
459 			}
460 			while(gotNum < 9) {
461 				auto g2 = socket.receive(dbuffer[gotNum .. $]);
462 				if(g2 == 0) {
463 					open = false;
464 					break;
465 				}
466 				gotNum += g2;
467 			}
468 
469 			auto got = dbuffer[0 .. gotNum];
470 			another:
471 			uint length;
472 			uint success;
473 			got = deserializeInto(got, length);
474 			got = deserializeInto(got, seq);
475 			got = deserializeInto(got, success);
476 			auto more = got[length .. $];
477 
478 			if(got.length >= length) {
479 				if(success) {
480 					/*
481 					auto s = (seq in onSuccesses);
482 					if(s !is null && *s !is null)
483 						(*s)(got);
484 					*/
485 					static if(is(typeof(return) == void)) {
486 					} else {
487 						got = deserializeInto(got, returned);
488 					}
489 				} else {
490 					/*
491 					auto s = (seq in onErrors);
492 					if(s !is null && *s !is null)
493 						(*s)(got);
494 					*/
495 					auto t = new Throwable("");
496 					got = deserializeInto(got, t.msg);
497 					got = deserializeInto(got, t.file);
498 					got = deserializeInto(got, t.line);
499 					throw t;
500 				}
501 			}
502 
503 			if(more.length) {
504 				got = more;
505 				goto another;
506 			}
507 			static if(is(typeof(return) == void)) {
508 
509 			} else
510 				return returned;
511 		};
512 
513 			code ~= "}\n";
514 			code ~= "\n";
515 		}
516 		//code ~= `}`;
517 		return code;
518 	}
519 
520 	//pragma(msg, createClass()); // for debugging help
521 	mixin(createClass());
522 }
523 
524 auto makeNetworkClient(Interface, bool useAsync = true)(string host, ushort port) {
525 	class Thing {
526 		mixin NetworkClient!(Interface, useAsync);
527 	}
528 
529 	return new Thing(host, port);
530 }
531 
532 // the protocol is:
533 /*
534 
535 client connects
536 	ulong interface hash
537 
538 handshake complete
539 
540 messages:
541 
542 	uint messageLength
543 	uint sequence number
544 	ushort function number, 0 is reserved for interface check
545 	serialized arguments....
546 
547 
548 
549 server responds with answers:
550 
551 	uint messageLength
552 	uint re: sequence number
553 	ubyte, 1 == success, 0 == error
554 	serialized return value
555 
556 */