Motion JPEG 配信サーバ
Motion JPEG を配信可能な Raspberry Pi 用サーバとして mjpeg-streamer があります
が、ソースをファイルまたは標準入力とし、出力を http 固定とするともう少し簡単に
コーディングできます。
以下は mjpeg-streamer を解析して自作してみた結果。
オプションは f, i, r, p の4つがあり、それぞれ入力ファイル名("-"で標準
入力)、再生インターバル(ミリ秒)、再生回数、サーバのポート番号です。
オプションは省略可能で、その場合のデフォルトは標準入力、10ミリ秒、再生回数
1回、ポート番号 8080 になります。
コンパイル&リンクには -pthread をつけて下さい。
ブラウザでの Motion JPEG 再生はなくなる方向らしいですが、firefox と android
は現状、サポートしているようです。
低遅延でそこそこ画像圧縮したい&ストリームの一部を切り抜きたいという向き
にはMotion JPEG は悪くない解だと思います。(今後は push を使え?)
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/select.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include <pthread.h> #include <sys/select.h> #define BOUNDARY "thisisanoncrossboundary" #define STD_HDR "Connection: close\r\n" \ "Server: MJPG-DIST/0.1\r\n" \ "Cache-control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \ "Pragma: no-cache\r\n" \ "Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n" typedef struct _RDCLIENT { char *command; // input command line int clen; // command line length int repeat; // repeat count int interval; // play interval in msec struct timeval timeval; // elapsed time struct sockaddr_in local; // local edge of the socket int fd; // file descriptor for the input file int sd; // socket descritor volatile int closed; // close flag } RDCLIENT; int bClosing; RDCLIENT rd[4]; pthread_t pid[4]; void showHint(void); int initServer(RDCLIENT *rd, int ad, struct sockaddr * sa, int interval, int r, char *input); void *processServer(void *x); int fillBuffer(char **buf, int *clen, int *buflen, FILE *fin); void calcDiff(struct timeval *pct, struct timeval *pst); void readCommand(RDCLIENT *rd); void writeMovieFile(RDCLIENT *rd); int main(int argc, char **argv) { struct sockaddr_in sin; int c; int state = 0; int port = 8080; int repeat = 1; int sd; int i; int interval = 10; char *filename = "-"; FILE *fsock = NULL; int ofs = 0, memsize = 1000; while ((c = getopt(argc, argv, "hp:f:i:r:")) != -1) { switch (c) { case 'h': showHint(); return 0; case 'p': port = atoi(optarg); break; case 'f': filename = optarg; break; case 'r': repeat = atoi(optarg); break; case 'i': interval = atoi(optarg); break; } } if (port == 0) { port = 8080; } if (repeat == 0) { repeat = 1; } if (interval == 0) { interval = 10; } //create servier socket informaiton sd = socket(AF_INET, SOCK_STREAM, 0); if (sd < 0) { fprintf(stderr, "failed to open socket or filename"); return 1; } memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons((short)port); // アドレスを再利用可能にする i = 1; if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int)) < 0) { fprintf(stderr, "setsockopt failed"); return 1; } // サーバソケットを作成し、listen する if (bind(sd, (struct sockaddr *)&sin, sizeof(sin)) < 0) { fprintf(stderr, "bind failed"); return 1; } if (listen(sd, 2) < 0) { fprintf(stderr, "listen failed"); return 1; } while (1) { int ad, n; struct sockaddr_in sa; struct timeval tv; fd_set fds; FD_ZERO(&fds); FD_SET(sd, &fds); tv.tv_sec = 0; tv.tv_usec = 100 * 1000; n = select(sd + 1, &fds, NULL, NULL, &tv); // working thread が終了しているか調べる for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) { if (rd[i].closed) { pthread_join(pid[i], NULL); memset(&rd[i], 0, sizeof(rd[0])); } } // ソケット接続がなければループに戻る if (n <=0 || !FD_ISSET(sd, &fds)) continue; // ソケット接続があったので接続を受ける socklen_t slen = sizeof(sa); memset(&sa, 0, sizeof(sa)); if ((ad = accept(sd, (struct sockaddr *)&sa, (socklen_t *)&slen)) < 0) { fprintf(stderr, "accept failed"); break; } // 接続バッファの空きがあるかどうか調べる for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) { if (rd[i].sd == 0) { // 空いている if (initServer(&rd[i], ad, (struct sockaddr *)&sa, interval, repeat, filename) < 0) { fprintf(stderr, "input file not found"); break; } if (pthread_create(&pid[i], NULL, processServer, &rd[i]) < 0) { fprintf(stderr, "pthread_create failed"); shutdown(ad, SHUT_RDWR); close(ad); } break; } } if (i == sizeof(rd)/sizeof(rd[0])) { // 空きがない FILE *fa = fdopen(ad, "wb"); fprintf(fa, "HTTP/1.1 503 Service Temporarily Unavailable\r\n\r\n"); fflush(fa); fclose(fa); } } bClosing = 1; for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) { if (rd[i].sd == 0) { pthread_join(pid[i], NULL); } } return 0; } // 使い方を表示する void showHint() { const static char *usage = "Usage: mjpg_dist [-f <filename>][-i <interval>][-p <port>][-r <repeat times>]\n" \ "Set <filename> to \"-\" for standard input.\n" \ "<interval> is in msec. default is 10msec\n" \ "<port> default is 8080. <repeat times> default is 1.\n"; fprintf(stderr, "%s", usage); } // サーバを初期化する int initServer(RDCLIENT *rd, int ad, struct sockaddr *sa, int interval, int r, char *input) { memset(rd, 0, sizeof(rd)); rd->interval = interval; rd->sd = ad; memcpy(&rd->local, sa, sizeof(struct sockaddr_in)); if (strcmp(input, "-") == 0) { // 標準入力からファイル読み出し rd->repeat = 1; // 繰り返し1回強制 rd->fd = 0; // 標準入力を指定 } else { // 通常ファイルから読み出し rd->repeat = r; rd->fd = open(input, O_RDONLY); if (rd->fd < 0) return -1; } return 0; } // クライアントからの入出力を更新する void *processServer(void *x) { RDCLIENT *rd = (RDCLIENT *)x; readCommand(rd); writeMovieFile(rd); free(rd->command); rd->closed = 1; return NULL; } void putCommandChar(RDCLIENT *rd, int c) { if (rd->clen == 0) { rd->command = (char *)malloc(1000); } else if (rd->clen % 1000 == 0) { rd->command = (char *)realloc(rd->command, rd->clen + 1000); } rd->command[rd->clen++] = (char)c; } // read command from the socket void readCommand(RDCLIENT *rd) { FILE *f = fdopen(dup(rd->sd), "rb"); int state = 0; int c; while ((c = fgetc(f)) != EOF) { putCommandChar(rd, c); switch (state) { case 0: if (c == '\r') { state = 1; } break; case 1: if (c == '\n') { state = 2; } else { state = 0; } break; case 2: if (c == '\r') { state = 3; } else { state = 0; } break; case 3: if (c == '\n') { fclose(f); return; } break; } } fclose(f); } void writeMovieFile(RDCLIENT *rd) { FILE *f = fdopen(rd->sd, "wb"); FILE *fin = fdopen(rd->fd, "rb"); struct timeval start; char *buf = 0; int clen = 0, buflen = 0; if (gettimeofday(&start, NULL) < 0) { fprintf(stderr, "failed gettimeofday"); fclose(fin); fclose(f); return; } // write header for whole of the stream fprintf(f, "HTTP/1.1 200 OK\r\n" \ STD_HDR \ "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \ "\r\n" \ "--" BOUNDARY "\r\n"); fflush(f); while (rd->repeat > 0) { while (1) { struct timeval ct; // データがやってくるまで待つ clen = 0; if (fillBuffer(&buf, &clen, &buflen, fin) < 0) break; gettimeofday(&ct, NULL); calcDiff(&ct, &start); // 個別 header を出力 fprintf(f, "Content-Type: image/jpeg\r\n" \ "Content-Length: %d\r\n" \ "X-Timestamp: %d.%06d\r\n" \ "\r\n", buflen, (int)ct.tv_sec, (int)ct.tv_usec); // フレーム本体を出力 fwrite(buf, clen, 1, f); // バウンダリ文字列を出力 fprintf(f, "\r\n--" BOUNDARY); if (fileno(fin) != 0 && rd->repeat == 1) { // ファイルの最後まで読み込んだら終了記号を出力する int c = fgetc(fin); if (c == EOF) { fprintf(f, "--"); } else { ungetc(c, fin); } } fprintf(f, "\r\n"); // ソケットに出力 fflush(f); if (fileno(fin) != 0) { // 標準入力でない場合は interval 経過するまで待つ ct.tv_sec = 0; ct.tv_usec = rd->interval * 1000L; select(0, NULL, NULL, NULL, &ct); } } if (buf == NULL) break; // 読み込みファイルを先頭に seek する // stdin からの入力は seek できないので終了する if (fileno(fin) == 0) break; fseek(fin, 0L, SEEK_SET); rd->repeat--; } fclose(fin); shutdown(fileno(f), SHUT_RDWR); fclose(f); if (buf) free(buf); } void calcDiff(struct timeval *pct, struct timeval *pst) { pct->tv_sec -= pst->tv_sec; if (pct->tv_usec > pst->tv_usec) { pct->tv_usec -= pst->tv_usec; } else { pct->tv_usec += 1000000 + (pct->tv_usec - pst->tv_usec); --pct->tv_sec; } } int fillBuffer(char **pbuf, int *pclen, int *plen, FILE *fin) { int c, state = 0; char *buf = *pbuf; int clen = 0; int len = *plen; while ((c = fgetc(fin)) != EOF) { // bClosing == 1 の場合は速やかに戻る if (bClosing) { clen = 0; break; } // キャラクタを保存する if (buf == NULL) { buf = (char *)malloc(1000); } else if (clen >= len) { buf = (char *)realloc(buf, len + 1000); len += 1000; } buf[clen++] = (char)c; switch (state) { case 0: if (c == 0xff) { state = 1; } break; case 1: if (c == 0xd9) { *pbuf = buf; *pclen = clen; *plen = len; return 0; } state = 0; break; } } if (clen == 0) { // 入力ファイルが閉じられた if (buf) free(buf); buf = NULL; len = 0; } *pbuf = buf; *pclen = clen; *plen = len; return -1; }