The OpenD Programming Language

1 /**
2 	OBSOLETE: This provides a kind of real time updates that can be consumed
3 	by javascript (and probably other things eventually). Superseded by
4 	new functionality built into [arsd.cgi].
5 
6 	First, you compile the server app. dmd -version=standalone_rtud -version=rtud_daemon
7 
8 	Run it. It's pretty generic; probably don't have to modify it
9 	but you always can. It's useful to have a long running process
10 	anyway.
11 
12 	But then you'll want an intermediary between this and the javascript.
13 	Use the handleListenerGateway() function in your cgi app for that.
14 	You can pass it a channel prefix for things like user ids.
15 
16 	In your javascript, use EventListener("path to gateway");
17 	And addEventListener(type, function, false);
18 
19 	Note: this javascript does not work in all browsers, but
20 	real time updates should really be optional anyway.
21 
22 	I might add a traditional ajax fallback but still, it's all
23 	js so be sure it's non-essential if possible.
24 
25 
26 	And in your app, as events happen, use in D:
27 		auto stream = new UpdateStream(channel);
28 		stream.sendMessage(type, message);
29 
30 	and the helper app will push it all out. You might want to wrap
31 	some of this in try/catch since if the helper app dies, this will
32 	throw since it can't connect.
33 
34 
35 	I found using user names as channels is good stuff. Then your JS
36 	doesn't provide a channel at all - your helper app gives it through
37 	the channel prefix argument.
38 */
39 module arsd.rtud;
40 
41 import std.string;
42 import std.array : replace;
43 import std.conv;
44 import std.date;
45 
46 
47 class UpdateStream {
48 	File net;
49 	string channel;
50 
51 	this(string channel) {
52 		net = openNetwork("localhost", 7071);
53 		this.channel = channel;
54 	}
55 
56 	~this() {
57 		net.close();
58 	}
59 
60 	void deleteMessage(string messageId) {
61 		import arsd.cgi; // : encodeVariables;
62 		string message = encodeVariables([
63 			"id" : messageId,
64 			"operation" : "delete"
65 		]);
66 
67 		net.writeln(message);
68 		net.flush();
69 	}
70 
71 	void sendMessage(string eventType, string messageText, long ttl = 2500) {
72 		import arsd.cgi; // : encodeVariables;
73 		string message = encodeVariables([
74 			//"operation" : "post",
75 			//"id" : ????,
76 			"channel" : channel,
77 			"type" : eventType,
78 			"data" : messageText,
79 			"ttl" : to!string(ttl)
80 		]);
81 
82 		net.writeln(message);
83 		net.flush();
84 	}
85 }
86 
87 /+
88 		if("channels" in message) {
89 		if("last-message-id" in message)
90 		if("minimum-time" in message)
91 		if("close-time" in message)
92 +/
93 
94 version(D_Version2) {
95 	static import linux = core.sys.posix.unistd;
96 	static import sock = core.sys.posix.sys.socket;
97 } else {
98 	static import linux = std.c.linux.linux;
99 	static import sock = std.c.linux.socket;
100 }
101 
102 int openNetworkFd(string host, ushort port) {
103 	import std.exception;
104 	auto h = enforce( sock.gethostbyname(std..string.toStringz(host)),
105 		new StdioException("gethostbyname"));
106 
107 	int s = sock.socket(sock.AF_INET, sock.SOCK_STREAM, 0);
108 	enforce(s != -1, new StdioException("socket"));
109 
110 	scope(failure) {
111 	    linux.close(s);
112 	}
113 
114 	sock.sockaddr_in addr;
115 
116 	addr.sin_family = sock.AF_INET;
117 	addr.sin_port = sock.htons(port);
118 	std.c..string.memcpy(&addr.sin_addr.s_addr, h.h_addr, h.h_length);
119 
120 	enforce(sock.connect(s, cast(sock.sockaddr*) &addr, addr.sizeof) != -1,
121 		new StdioException("Connect failed"));
122 
123 	return s;
124 }
125 
126 void writeToFd(int fd, string s) {
127 	again:
128 	auto num = linux.write(fd, s.ptr, s.length);
129 	if(num < 0)
130 		throw new Exception("couldn't write");
131 	if(num == 0)
132 		return;
133 	s = s[num .. $];
134 	if(s.length)
135 		goto again;
136 }
137 
138 __gshared bool deathRequested = false;
139 extern(C)
140 void requestDeath(int sig) {
141 	deathRequested = true;
142 }
143 
144 import arsd.cgi;
145 /// The throttledConnection param is useful for helping to get
146 /// around browser connection limitations.
147 
148 /// If the user opens a bunch of tabs, these long standing
149 /// connections can hit the per-host connection limit, breaking
150 /// navigation until the connection times out.
151 
152 /// The throttle option sets a long retry period and polls
153 /// instead of waits. This sucks, but sucks less than your whole
154 /// site hanging because the browser is queuing your connections!
155 int handleListenerGateway(Cgi cgi, string channelPrefix, bool throttledConnection = false) {
156 	cgi.setCache(false);
157 
158 	import core.sys.posix.signal;
159 	sigaction_t act;
160 	// I want all zero everywhere else; the read() must not automatically restart for this to work.
161 	act.sa_handler = &requestDeath;
162 
163 	if(linux.sigaction(linux.SIGTERM, &act, null) != 0)
164 		throw new Exception("sig err");
165 
166 	auto f = openNetworkFd("localhost", 7070);
167 	scope(exit) linux.close(f);
168 
169 	string[string] variables;
170 
171 	variables["channel"] = channelPrefix ~ ("channel" in cgi.get ? cgi.get["channel"] : "");
172 	if("minimum-time" in cgi.get)
173 		variables["minimum-time"] = cgi.get["minimum-time"];
174 	if("last-message-id" in cgi.get)
175 		variables["last-message-id"] = cgi.get["last-message-id"];
176 
177 	bool isSse;
178 
179 	if(cgi.accept == "text/event-stream") {
180 		cgi.setResponseContentType("text/event-stream");
181 		isSse = true;
182 		if(cgi.lastEventId.length)
183 			variables["last-message-id"] = cgi.lastEventId;
184 
185 		if(throttledConnection) {
186 			cgi.write("retry: 15000\n");
187 		} else {
188 			cgi.write(":\n"); // the comment ensures apache doesn't skip us
189 		}
190 
191 		cgi.flush(); // sending the headers along
192 	} else {
193 		// gotta handle it as ajax polling
194 		variables["close-time"] = "0"; // ask for long polling
195 	}
196 
197 
198 	if(throttledConnection)
199 		variables["close-time"] = "-1"; // close immediately
200 
201 	writeToFd(f, encodeVariables(variables) ~ "\n");
202 
203 	string wegot;
204 
205 	string[4096] buffer;
206 
207 	for(; !deathRequested ;) {
208 		auto num = linux.read(f, buffer.ptr, buffer.length);
209 		if(num < 0)
210 			throw new Exception("read error");
211 		if(num == 0)
212 			break;
213 
214 		auto chunk = buffer[0 .. num];
215 		if(isSse) {
216 			cgi.write(chunk);
217 			cgi.flush();
218 		} else {
219 			wegot ~= cast(string) chunk;
220 		}
221 	}
222 
223 	// this is to support older browsers
224 	if(!isSse && !deathRequested) {
225 		// we have to parse it out and reformat for plain cgi...
226 		auto lol = parseMessages(wegot);
227 		//cgi.setResponseContentType("text/json");
228 		// FIXME gotta reorganize my json stuff
229 		//cgi.write(toJson(lol));
230 		return 1;
231 	}
232 
233 	return 0;
234 }
235 
236 struct Message {
237 	string type;
238 	string id;
239 	string data;
240 	long timestamp;
241 	long ttl;
242 
243 	string operation;
244 }
245 
246 
247 Message[] getMessages(string channel, string eventTypeFilter = null, long maxAge = 0) {
248 	auto f = openNetworkFd("localhost", 7070);
249 	scope(exit) linux.close(f);
250 
251 	string[string] variables;
252 
253 	variables["channel"] = channel;
254 	if(maxAge)
255 		variables["minimum-time"] = to!string(getUtcTime() - maxAge);
256 
257 	variables["close-time"] = "-1"; // close immediately
258 
259 	writeToFd(f, encodeVariables(variables) ~ "\n");
260 
261 	string wegot;
262 
263 	string[4096] buffer;
264 
265 	for(;;) {
266 		auto num = linux.read(f, buffer.ptr, buffer.length);
267 		if(num < 0)
268 			throw new Exception("read error");
269 		if(num == 0)
270 			break;
271 
272 		auto chunk = buffer[0 .. num];
273 		wegot ~= cast(string) chunk;
274 	}
275 
276 	return parseMessages(wegot, eventTypeFilter);
277 }
278 
279 Message[] parseMessages(string wegot, string eventTypeFilter = null) {
280 	// gotta parse this since rtud writes out the format for browsers
281 	Message[] ret;
282 	foreach(message; wegot.split("\n\n")) {
283 		Message m;
284 		foreach(line; message.split("\n")) {
285 			if(line.length == 0)
286 				throw new Exception("wtf");
287 			if(line[0] == ':')
288 				line = line[1 .. $];
289 
290 			if(line.length == 0)
291 				continue; // just an empty comment
292 
293 			auto idx = line.indexOf(":");
294 			if(idx == -1)
295 				continue; // probably just a comment
296 
297 			if(idx + 2 > line.length)
298 				continue; // probably just a comment too
299 
300 			auto name = line[0 .. idx];
301 			auto data = line[idx + 2 .. $];
302 
303 			switch(name) {
304 				default: break; // do nothing
305 				case "timestamp":
306 					if(data.length)
307 					m.timestamp = to!long(data);
308 				break;
309 				case "ttl":
310 					if(data.length)
311 					m.ttl = to!long(data);
312 				break;
313 				case "operation":
314 					m.operation = data;
315 				break;
316 				case "id":
317 					m.id = data;
318 				break;
319 				case "event":
320 					m.type = data;
321 				break;
322 				case "data":
323 					m.data ~= data;
324 				break;
325 			}
326 		}
327 		if(eventTypeFilter is null || eventTypeFilter == m.type)
328 			ret ~= m;
329 	}
330 
331 	return ret;
332 }
333 
334 
335 version(rtud_daemon) :
336 
337 import arsd.netman;
338 
339 // Real time update daemon
340 /*
341 	You push messages out to channels, where they are held for a certain length of time.
342 
343 	It can also do state with listener updates.
344 
345 	Clients ask for messages since a time, and if there are none, you hold the connection until something arrives.
346 
347 
348 	There should be D and Javascript apis for pushing and receiving.
349 
350 
351 	JS:
352 
353 	var updateObject = RealTimeUpdate();
354 
355 	updateObject.someMessage = function(msg) {
356 		// react to it
357 	}
358 
359 	updateObject.listen(channel);
360 
361 	updateObject.send(message, args); // probably shouldn't need this from JS
362 */
363 
364 /*
365 	Incoming Packet format is x-www-urlencoded. There must be no new lines
366 	in there - be sure to url encode them.
367 
368 	A message is separated by newlines.
369 */
370 
371 class RtudConnection : Connection {
372 	RealTimeUpdateDaemon daemon;
373 
374 	this(RealTimeUpdateDaemon daemon) {
375 		this.daemon = daemon;
376 	}
377 
378 	override void onDataReceived() {
379 		import arsd.cgi;// : decodeVariables;
380 		try_again:
381 			auto data = cast(string) read();
382 
383 			auto index = data.indexOf("\n");
384 			if(index == -1)
385 				return; // wait for more data
386 
387 			auto messageRaw = data[0 .. index];
388 			changeReadPosition(index + 1);
389 
390 			auto message = decodeVariables(messageRaw);
391 
392 			handleMessage(message);
393 		goto try_again;
394 	}
395 
396 	invariant() {
397 		assert(daemon !is null);
398 	}
399 
400 	abstract void handleMessage(string[][string] message);
401 }
402 
403 class NotificationConnection : RtudConnection {
404 	this(RealTimeUpdateDaemon daemon) {
405 		super(daemon);
406 		closeTime = long.max;
407 	}
408 
409 	long closeTime;
410 
411 	/// send: what channels you're interested in, a minimum time,
412 	/// and a close time.
413 	/// if the close time is negative, you are just polling curiously.
414 	/// if it is zero, it will close after your next batch. (long polling)
415 	/// anything else stays open for as long as it can in there.
416 
417 	override void handleMessage(string[][string] message) {
418 		Channel*[] channels;
419 
420 		if("channels" in message) {
421 			foreach(ch; message["channels"]) {
422 				auto channel = daemon.getChannel(ch);
423 				channels ~= channel;
424 				channel.subscribeTo(this);
425 			}
426 		}
427 
428 		if("channel" in message) {
429 			auto channel = daemon.getChannel(message["channel"][$-1]);
430 			channels ~= channel;
431 			channel.subscribeTo(this);
432 		}
433 
434 		import std.algorithm;
435 		import std.range;
436 
437 		Message*[] backMessages;
438 
439 		if("last-message-id" in message) {
440 			auto lastMessageId = message["last-message-id"][$-1];
441 			foreach(channel; channels)
442 				backMessages ~= channel.messages;
443 
444 			auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
445 
446 			backMessages = array(find!("a.id == b")(bm, lastMessageId));
447 			while(backMessages.length && backMessages[0].id == lastMessageId)
448 				backMessages = backMessages[1 .. $]; // the last message is the one they got
449 
450 			//writeln("backed up from ", lastMessageId, " is");
451 			//foreach(msg; backMessages)
452 				//writeln(*msg);
453 		} else if("minimum-time" in message) {
454 			foreach(channel; channels)
455 				backMessages ~= channel.messages;
456 
457 			auto bm = sort!"a.timestamp < b.timestamp"(backMessages);
458 
459 			backMessages = array(find!("a.timestamp >= b")(bm, to!long(message["minimum-time"][$-1])));
460 		}
461 
462 		if("close-time" in message)
463 			closeTime = to!long(message["close-time"][$-1]);
464 
465 		// send the back messages immediately
466 		daemon.writeMessagesTo(backMessages, this, "backed-up");
467 
468 //		if(closeTime > 0 && closeTime != long.max)
469 //			closeTime = getUtcTime() + closeTime; // FIXME: do i use this? Should I use this?
470 	}
471 
472 	override void onDisconnect() {
473 		daemon.removeConnection(this);
474 	}
475 
476 }
477 
478 class DataConnection : RtudConnection {
479 	this(RealTimeUpdateDaemon daemon) {
480 		super(daemon);
481 	}
482 
483 	override void handleMessage(string[][string] message) {
484 		string getStr(string key, string def) {
485 			if(key in message) {
486 				auto s = message[key][$ - 1];
487 				if(s.length)
488 					return s;
489 			}
490 			return def;
491 		}
492 
493 		string operation =  getStr("operation", "post");
494 
495 		Message* m = daemon.getMessage(getStr("id", null));
496 		switch(operation) {
497 			default: throw new Exception("unknown operation " ~ operation); break;
498 			case "delete":
499 				daemon.deleteMessage(m);
500 			break;
501 			case "edit":
502 			case "post":
503 				// we have to create the message and send it out
504 				m.type = getStr("type", "message");
505 				m.data = getStr("data", "");
506 				m.timestamp = to!long(getStr("timestamp", to!string(getUtcTime())));
507 				m.ttl = to!long(getStr("ttl", "1000"));
508 		}
509 
510 		assert(m !is null);
511 
512 		if("channels" in message)
513 		foreach(ch; message["channels"]) {
514 			auto channel = daemon.getChannel(ch);
515 			assert(channel !is null);
516 			channel.writeMessage(m, operation);
517 		}
518 
519 		if("channel" in message) {
520 			auto channel = daemon.getChannel(message["channel"][$-1]);
521 			channel.writeMessage(m, operation);
522 		}
523 	}
524 }
525 
526 struct Channel {
527 	string id;
528 	Message*[] messages;
529 
530 	// a poor man's set...
531 	NotificationConnection[NotificationConnection] listeningConnections;
532 
533 
534 	RealTimeUpdateDaemon daemon;
535 
536 	void writeMessage(Message* message, string operation) {
537 		messages ~= message;
538 		foreach(k, v; listeningConnections)
539 			daemon.writeMessagesTo([message], v, operation);
540 	}
541 
542 	void subscribeTo(NotificationConnection c) {
543 		listeningConnections[c] = c;
544 	}
545 }
546 
547 
548 class RealTimeUpdateDaemon : NetworkManager {
549 	this() {
550 		super();
551 		setConnectionSpawner(7070, &createNotificationConnection);
552 		listen(7070);
553 		setConnectionSpawner(7071, &createDataConnection);
554 		listen(7071);
555 	}
556 
557 	private Channel*[string] channels;
558 	private Message*[string] messages;
559 
560 	Message* getMessage(string id) {
561 		if(id.length && id in messages)
562 			return messages[id];
563 
564 		if(id.length == 0)
565 			id = to!string(getUtcTime());
566 
567 		longerId:
568 		if(id in messages) {
569 			id ~= "-";
570 			goto longerId;
571 		}
572 
573 
574 		auto message = new Message;
575 		message.id = id;
576 		messages[id] = message;
577 
578 		//writeln("NEW MESSAGE: ", *message);
579 
580 		return message;
581 	}
582 
583 	void deleteMessage(Message* m) {
584 		messages.remove(m.id);
585 		foreach(k, v; channels)
586 		foreach(i, msg; v.messages) {
587 			if(msg is m) {
588 				v.messages = v.messages[0 .. i] ~ v.messages[i + 1 .. $];
589 				break;
590 			}
591 		}
592 	}
593 
594 	Channel* getChannel(string id) {
595 		if(id in channels)
596 			return channels[id];
597 
598 		auto c = new Channel;
599 		c.daemon = this;
600 		c.id = id;
601 		channels[id] = c;
602 		return c;
603 	}
604 
605 	void writeMessagesTo(Message*[] messages, NotificationConnection connection, string operation) {
606 		foreach(messageMain; messages) {
607 			if(messageMain.timestamp + messageMain.ttl < getUtcTime)
608 				deleteMessage(messageMain); // too old, kill it
609 			Message message = *messageMain;
610 			message.operation = operation;
611 
612 			// this should never happen, but just in case
613 			replace(message.type, "\n", "");
614 			connection.write(":timestamp: " ~ to!string(message.timestamp) ~ "\n");
615 			connection.write(":ttl: " ~ to!string(message.ttl) ~ "\n");
616 			connection.write(":operation: " ~ message.operation ~ "\n");
617 			if(message.id.length)
618 				connection.write("id: " ~ message.id ~ "\n");
619 			connection.write("event: " ~ message.type ~ "\n");
620 			connection.write("data: " ~ replace(message.data, "\n", "\ndata: ") ~ "\n");
621 			connection.write("\n");
622 		}
623 
624 		if(connection.closeTime <= 0) // FIXME: other times?
625 			if(connection.closeTime != 0 || messages.length)
626 				connection.disconnect(); // note this actually queues a disconnect, so we cool
627 	}
628 
629 	void removeConnection(NotificationConnection connection) {
630 		foreach(channel; channels)
631 			channel.listeningConnections.remove(connection);
632 	}
633 
634 	Connection createNotificationConnection() {
635 		return new NotificationConnection(this);
636 	}
637 
638 	Connection createDataConnection() {
639 		return new DataConnection(this);
640 	}
641 }
642 
643 void rtudMain() {
644 	auto netman = new RealTimeUpdateDaemon;
645 
646 	bool proceed = true;
647 
648 	while(proceed)
649 		try
650 			proceed = netman.proceed();
651 		catch(ConnectionException e) {
652 		writeln(e.toString());
653 			e.c.disconnectNow();
654 		}
655 		catch(Throwable e) {
656 
657 
658 		writeln(e.toString());
659 		}
660 }
661 
662 version(standalone_rtud)
663 void main() {
664 	rtudMain();
665 }