徒然なる日々を送るソフトウェアデベロッパーの記録(2)

技術上思ったことや感じたことを気ままに記録していくブログです。さくらから移設しました。

Motion JPEG 配信サーバ

Motion JPEG を配信可能な Raspberry Pi 用サーバとして mjpeg-streamer があります
が、ソースをファイルまたは標準入力とし、出力を http 固定とするともう少し簡単に
コーディングできます。

以下は mjpeg-streamer を解析して自作してみた結果。
オプションは f, i, r, p の4つがあり、それぞれ入力ファイル名("-"で標準
入力)、再生インターバル(ミリ秒)、再生回数、サーバのポート番号です。
オプションは省略可能で、その場合のデフォルトは標準入力、10ミリ秒、再生回数
1回、ポート番号 8080 になります。
コンパイル&リンクには -pthread をつけて下さい。

ブラウザでの Motion JPEG 再生はなくなる方向らしいですが、firefoxandroid
は現状、サポートしているようです。
低遅延でそこそこ画像圧縮したい&ストリームの一部を切り抜きたいという向き
にはMotion JPEG は悪くない解だと思います。(今後は push を使え?)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/select.h>

#define BOUNDARY "thisisanoncrossboundary"
#define STD_HDR "Connection: close\r\n" \
        "Server: MJPG-DIST/0.1\r\n" \
        "Cache-control: no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0\r\n" \
        "Pragma: no-cache\r\n" \
        "Expires: Mon, 3 Jan 2000 12:34:56 GMT\r\n"

typedef struct _RDCLIENT {
  char *command; // input command line
  int clen;     // command line length
  int repeat;   // repeat count
  int interval;  // play interval in msec
  struct timeval timeval; // elapsed time
  struct sockaddr_in local; // local edge of the socket
  int fd; // file descriptor for the input file
  int sd; // socket descritor
  volatile int closed; // close flag
} RDCLIENT; 
int bClosing;
RDCLIENT rd[4];
pthread_t pid[4];

void showHint(void);
int initServer(RDCLIENT *rd, int ad, struct sockaddr * sa, int interval, int r, char *input);
void *processServer(void *x);
int fillBuffer(char **buf, int *clen, int *buflen, FILE *fin);
void calcDiff(struct timeval *pct, struct timeval *pst);
void readCommand(RDCLIENT *rd);
void writeMovieFile(RDCLIENT *rd);

int main(int argc, char **argv) {
  struct sockaddr_in sin;
  int c;
  int state = 0;
  int port = 8080;
  int repeat = 1;
  int sd;
  int i;
  int interval = 10;
  char *filename = "-";
  FILE *fsock = NULL;
  int ofs = 0, memsize = 1000;
  while ((c = getopt(argc, argv, "hp:f:i:r:")) != -1) {
    switch (c) {
    case 'h':
      showHint();
      return 0;

    case 'p':
      port = atoi(optarg);
      break;

    case 'f':
      filename = optarg;
      break;

    case 'r':
      repeat = atoi(optarg);
      break;

    case 'i':
      interval = atoi(optarg);
      break;
    }
  }
  if (port == 0) {
    port = 8080;
  }
  if (repeat == 0) {
    repeat = 1;
  }
  if (interval == 0) {
    interval = 10;
  }

  //create servier socket informaiton
  sd = socket(AF_INET, SOCK_STREAM, 0);
  if (sd < 0) {
    fprintf(stderr, "failed to open socket or filename");
    return 1;
  }
  memset(&sin, 0, sizeof(sin));
  sin.sin_family = AF_INET;
  sin.sin_port = htons((short)port);
  // アドレスを再利用可能にする
  i = 1;
  if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int)) < 0) {
    fprintf(stderr, "setsockopt failed");
    return 1;
  }

  // サーバソケットを作成し、listen する
  if (bind(sd, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
    fprintf(stderr, "bind failed");
    return 1;
  }
  if (listen(sd, 2) < 0) {
    fprintf(stderr, "listen failed");
    return 1;
  }

  while (1) {
    int ad, n;
    struct sockaddr_in sa;
    struct timeval tv;

    fd_set fds;
    FD_ZERO(&fds);
    FD_SET(sd, &fds);
    tv.tv_sec = 0;
    tv.tv_usec = 100 * 1000;
    n = select(sd + 1, &fds, NULL, NULL, &tv);

    // working thread が終了しているか調べる
    for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) {
      if (rd[i].closed) {
        pthread_join(pid[i], NULL);
        memset(&rd[i], 0, sizeof(rd[0]));
      }
    }

    // ソケット接続がなければループに戻る
    if (n <=0 || !FD_ISSET(sd, &fds)) continue;

    // ソケット接続があったので接続を受ける
    socklen_t slen = sizeof(sa);
    memset(&sa, 0, sizeof(sa));
    if ((ad = accept(sd, (struct sockaddr *)&sa, (socklen_t *)&slen)) < 0) {
      fprintf(stderr, "accept failed");
      break;
    }

    // 接続バッファの空きがあるかどうか調べる
    for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) {
      if (rd[i].sd == 0) {
        // 空いている
        if (initServer(&rd[i], ad, (struct sockaddr *)&sa, interval, repeat, filename) < 0) {
          fprintf(stderr, "input file not found");
          break;
        }
        if (pthread_create(&pid[i], NULL, processServer, &rd[i]) < 0) {
          fprintf(stderr, "pthread_create failed");
          shutdown(ad, SHUT_RDWR);
          close(ad);
        }
        break;
      }
    }
    if (i == sizeof(rd)/sizeof(rd[0])) {
      // 空きがない
      FILE *fa = fdopen(ad, "wb");
      fprintf(fa, "HTTP/1.1 503 Service Temporarily Unavailable\r\n\r\n");
      fflush(fa);
      fclose(fa);
    }
  }

  bClosing = 1;
  for (i = 0; i < sizeof(rd)/sizeof(rd[0]); ++i) {
    if (rd[i].sd == 0) {
      pthread_join(pid[i], NULL);
    }
  }

  return 0;
}

// 使い方を表示する
void showHint() {
  const static char *usage = "Usage: mjpg_dist [-f <filename>][-i <interval>][-p <port>][-r <repeat times>]\n" \
  "Set <filename> to \"-\" for standard input.\n" \
  "<interval> is in msec. default is 10msec\n" \
  "<port> default is 8080. <repeat times> default is 1.\n";
  fprintf(stderr, "%s", usage);
}

// サーバを初期化する
int initServer(RDCLIENT *rd, int ad, struct sockaddr *sa, int interval, int r, char *input) {
  memset(rd, 0, sizeof(rd));
  rd->interval = interval;
  rd->sd = ad;
  memcpy(&rd->local, sa, sizeof(struct sockaddr_in));
  if (strcmp(input, "-") == 0) {
    // 標準入力からファイル読み出し
    rd->repeat = 1; // 繰り返し1回強制
    rd->fd = 0; // 標準入力を指定
  } else {
    // 通常ファイルから読み出し
    rd->repeat = r;
    rd->fd = open(input, O_RDONLY);
    if (rd->fd < 0) return -1;
  }
  return 0;
}

// クライアントからの入出力を更新する
void *processServer(void *x) {
  RDCLIENT *rd = (RDCLIENT *)x;
  readCommand(rd);
  writeMovieFile(rd);
  free(rd->command);
  rd->closed = 1;
  return NULL;
}

void putCommandChar(RDCLIENT *rd, int c) {
  if (rd->clen == 0) {
    rd->command = (char *)malloc(1000);
  } else if (rd->clen % 1000 == 0) {
    rd->command = (char *)realloc(rd->command, rd->clen + 1000);
  }
  rd->command[rd->clen++] = (char)c;
}
// read command from the socket
void readCommand(RDCLIENT *rd) {
  FILE *f = fdopen(dup(rd->sd), "rb");
  int state = 0;
  int c;

  while ((c = fgetc(f)) != EOF) {
    putCommandChar(rd, c);
    switch (state) {
    case 0:
      if (c == '\r') {
        state = 1;
      }
      break;
    case 1:
      if (c == '\n') {
        state = 2;
      } else {
        state = 0;
      }
      break;
    case 2:
      if (c == '\r') {
        state = 3;
      } else {
        state = 0;
      }
      break;
    case 3:
      if (c == '\n') {
        fclose(f);
        return;
      }
      break;
    }
  }
  fclose(f);
}

void writeMovieFile(RDCLIENT *rd) {
  FILE *f = fdopen(rd->sd, "wb");
  FILE *fin = fdopen(rd->fd, "rb");
  struct timeval start;
  char *buf = 0;
  int clen = 0, buflen = 0;

  if (gettimeofday(&start, NULL) < 0) {
    fprintf(stderr, "failed gettimeofday");
    fclose(fin);
    fclose(f);
    return;
  }

  // write header for whole of the stream
  fprintf(f, "HTTP/1.1 200 OK\r\n" \
     STD_HDR \
     "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \
     "\r\n" \
     "--" BOUNDARY "\r\n");
  fflush(f);

  while (rd->repeat > 0) {
    while (1) {
      struct timeval ct;

      // データがやってくるまで待つ
      clen = 0;
      if (fillBuffer(&buf, &clen, &buflen, fin) < 0) break;
      gettimeofday(&ct, NULL);
      calcDiff(&ct, &start);
  
      // 個別 header を出力
      fprintf(f, "Content-Type: image/jpeg\r\n" \
        "Content-Length: %d\r\n" \
        "X-Timestamp: %d.%06d\r\n" \
        "\r\n", buflen, (int)ct.tv_sec, (int)ct.tv_usec);

      // フレーム本体を出力
      fwrite(buf, clen, 1, f);

      // バウンダリ文字列を出力
      fprintf(f, "\r\n--" BOUNDARY);

      if (fileno(fin) != 0 && rd->repeat == 1) {
        // ファイルの最後まで読み込んだら終了記号を出力する
        int c = fgetc(fin);
        if (c == EOF) {
          fprintf(f, "--");
        } else {
          ungetc(c, fin);
        }
      }

      fprintf(f, "\r\n");

      // ソケットに出力
      fflush(f);

      if (fileno(fin) != 0) {
        // 標準入力でない場合は interval 経過するまで待つ
        ct.tv_sec = 0;
        ct.tv_usec = rd->interval * 1000L;
        select(0, NULL, NULL, NULL, &ct);
      }
    }
    if (buf == NULL) break;

    // 読み込みファイルを先頭に seek する
    // stdin からの入力は seek できないので終了する
    if (fileno(fin) == 0) break;
    fseek(fin, 0L, SEEK_SET);
    rd->repeat--;
  }
  fclose(fin);
  shutdown(fileno(f), SHUT_RDWR);
  fclose(f);
  if (buf) free(buf);
}

void calcDiff(struct timeval *pct, struct timeval *pst) {
  pct->tv_sec -= pst->tv_sec;
  if (pct->tv_usec > pst->tv_usec) {
    pct->tv_usec -= pst->tv_usec;
  } else {
    pct->tv_usec += 1000000 + (pct->tv_usec - pst->tv_usec);
    --pct->tv_sec;
  }
}

int fillBuffer(char **pbuf, int *pclen, int *plen, FILE *fin) {
  int c, state = 0;
  char *buf = *pbuf;
  int clen = 0;
  int len = *plen;

  while ((c = fgetc(fin)) != EOF) {
    // bClosing == 1 の場合は速やかに戻る
    if (bClosing) {
      clen = 0;
      break;
    }

    // キャラクタを保存する
    if (buf == NULL) {
      buf = (char *)malloc(1000);
    } else if (clen >= len) {
      buf = (char *)realloc(buf, len + 1000);
      len += 1000;
    }
    buf[clen++] = (char)c;

    switch (state) {
    case 0:
      if (c == 0xff) {
        state = 1;
      }
      break;
    case 1:
      if (c == 0xd9) {
        *pbuf = buf;
        *pclen = clen;
        *plen = len;
        return 0;
      }
      state = 0;
      break;
    }
  }

  if (clen == 0) {
    // 入力ファイルが閉じられた
    if (buf) free(buf);
    buf = NULL;
    len = 0;
  }
  *pbuf = buf;
  *pclen = clen;
  *plen = len;
  return -1;
}