Sunday, December 25, 2016

GStreamer でテキスト処理 3: 行に分割する textparse (Parse)

1 はじめに

「GStreamer でテキスト処理 1: Core機能だけで cat, cp, dd, & tee」のおわりにで触れたように、GStreamer には「行」という概念がありません。しかし、多くの Unix ツールは、「行」を基本データーとして扱います。 grep しかり、 sed しかり。そこで、ストリーム内に流れるデーターを行毎に区切ってからダウンストリームに渡すプラグインを作ってみたいと思います。

GStreamer では、ある条件に従ってデーターを区切るエレメントを Parser と呼びます。今回は「改行があれば、そこで区切る」という条件を持った Parser を作成します。 Upstream からは、行を無視して GstBuffer にテキストデーターが詰められてやってきます。これから作成するエレメントは、一行分のデーターだけが入った GstBuffer を Downstream に流すように作ります。

GstElement から派生して作成しても良いのですが、せっかくなので GstBaseParse クラスを使ってみましょう。 GstBaseParse は、パーサーエレメントを作るときに便利なベースクラスです。パーサーとしてやらなければいけない多くの事をやってくれるので、子クラスでは「どのようにデーターをパースするか」というパーサー固有の部分を実装すれば良いようになっています。

GStreamer は、すでに多くのパーサーエレメントを持っています。パーサーエレメントは、だいたい "xxxparse" という名前になっているので、以下のコマンドでリストすることができます。多くのパーサーはすでに GstBaseParse を使って実装されているので自作パーサーを作るときにも参考になります。

$ gst-inspect-1.0 | grep parse

2 決り文句

それでは、いつものように、class のボイラープレート(決まり文句)から始めます。

前回 と違うのは、 GstElement ではなく GstBaseParse を継承するところです。

#ifndef __GST_TEXT_PARSE_H__
#define __GST_TEXT_PARSE_H__

#include <gst/base/gstbaseparse.h>

G_BEGIN_DECLS

G_DECLARE_FINAL_TYPE (GstTextParse, gst_text_parse, GST_TEXT, PARSE, GstBaseParse)

G_END_DECLS

#endif /* __GST_TEXT_PARSE_H__ */
#include <gst/base/gstbaseparse.h>
GstBaseParse クラスを使うので、そのヘッダーファイルを include
G_DECLARE_FINAL_TYPE
GstElement ではなく GstBaseParse を親クラスに指定します。
#include "gsttextparse.h"

struct _GstTextParse
{
        GstBaseParse parent;

};

G_DEFINE_TYPE(GstTextParse, gst_text_parse, GST_TYPE_BASE_PARSE)

static GstStaticPadTemplate sinktemplate =
        GST_STATIC_PAD_TEMPLATE ("sink",
                                 GST_PAD_SINK,
                                 GST_PAD_ALWAYS,
                                 GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate srctemplate =
        GST_STATIC_PAD_TEMPLATE ("src",
                                 GST_PAD_SRC,
                                 GST_PAD_ALWAYS,
                                 GST_STATIC_CAPS_ANY);

static void
gst_text_parse_class_init (GstTextParseClass * klass)
{
        GstElementClass *element_class = GST_ELEMENT_CLASS(klass);

        gst_element_class_set_static_metadata (element_class,
                                               "Text Parser element",
                                               "Filter",
                                               "Parse text stream",
                                               "Yasushi SHOJI <yasushi.shoji@gmail.com");

        gst_element_class_add_static_pad_template (element_class, &srctemplate);
        gst_element_class_add_static_pad_template (element_class, &sinktemplate);
}

static void
gst_text_parse_init (G_GNUC_UNUSED GstTextParse * parse)
{
}
#include "gsttextparse.h"
先程作ったヘッダーを include
G_DEFINE_TYPE
GstTextParse クラスを作るので、その名前で作成します
GST_STATIC_PAD_TEMPLATE
パッドテンプレートは必要なので、入れてしまいます。今回もとりあえず ANY で
gst_element_class_set_static_metadata()
メタデーターの情報も、Text Parser に合せて更新しておきます。
gst_element_class_add_static_pad_template()
作成したテンプレートも追加しておきます
gst_text_parse_init()
インスタンスの初期化関数は、とりあえず空っぽにしておきましょう
#include <gst/gst.h>
#include "config.h"
#include "gsttextnoop.h"
#include "gsttextparse.h"

static gboolean
plugin_init (GstPlugin * plugin)
{
        gst_element_register(plugin, "textnoop", GST_RANK_NONE, gst_text_noop_get_type());
        gst_element_register(plugin, "textparse", GST_RANK_NONE, gst_text_parse_get_type());
        return TRUE;
}


GST_PLUGIN_DEFINE (
        GST_VERSION_MAJOR,
        GST_VERSION_MINOR,
        gsttext,
        "Text Plugins for GStreamer",
        plugin_init,
        PACKAGE_VERSION,
        "LGPL",
        "GStreamer Text Package",
        "https://github/yashi/gst-plugins-text")
gst_element_register()
textparse を追加しました
project('gst-plugins-text', 'c', version : '0.1.0')

src = ['src/gsttext.c', 'src/gsttextnoop.c', 'src/gsttextparse.c']

gst_dep = dependency('gstreamer-base-1.0', version : '>1.0')

cdata = configuration_data()
cdata.set_quoted('PACKAGE', meson.project_name())
cdata.set_quoted('PACKAGE_VERSION', meson.project_version())

configure_file(output : 'config.h', configuration : cdata)

gsttext = library('gsttext', src, dependencies : gst_dep)
src
作成した 'src/gsttextparse.c' を追加しています。左にあるソースコードから順番にビルドされるようです
dependency()
gstreamer-1.0 ではなく、 gstreamer-base-1.0 に変更しています。 GstBaseParsegstreamer-base-1.0 に含まれている為です。

3 必ず実装するもの

GstBaseParse から派生させた子クラスでは、以下の要件を必ず実装しなければなりません。

  • srcsink と名前を付けた、パットテンプレートを持つ
  • handle_frame メソッドを実装する
  • ソースパットの Caps を確定(Fixate)する

3.1 handle_frame

handle_frame() を実装していないと、 Segmentation Fault が発生します。

$ GST_PLUGIN_PATH=. gst-launch-1.0 filesrc location=/tmp/a.txt ! textparse ! filesink location=/tmp/b.txt
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
Caught SIGSEGV

理由は、 GstBaseParse が Pure Virtual Method の handle_frame() を実装されているか確認せずに呼び出すためです。

static GstFlowReturn
gst_base_parse_handle_buffer (GstBaseParse * parse, GstBuffer * buffer,
    gint * skip, gint * flushed)
{
        :
        :
  frame = gst_base_parse_prepare_frame (parse, buffer);
  ret = klass->handle_frame (parse, frame, skip);
        :
        :
}

以下のように handle_frame() を実装しました。

static void
gst_text_parse_class_init (GstTextParseClass * klass)
{
        GstElementClass *element_class = GST_ELEMENT_CLASS(klass);
        GstBaseParseClass *base_class = GST_BASE_PARSE_CLASS(klass);

        gst_element_class_set_static_metadata (element_class,
                                               "Text Parser element",
                                               "Filter",
                                               "Parse text stream ",
                                               "Yasushi SHOJI <yasushi.shoji@gmail.com");

        gst_element_class_add_static_pad_template (element_class, &srctemplate);
        gst_element_class_add_static_pad_template (element_class, &sinktemplate);

        base_class->handle_frame = gst_text_parse_handle_frame;
}

static GstFlowReturn
gst_text_parse_handle_frame (GstBaseParse * parse,
                             GstBaseParseFrame * frame,
                             G_GNUC_UNUSED gint * skipsize)
{
        gint size;

        size = gst_buffer_get_size(frame->buffer);

        return gst_base_parse_finish_frame(parse, frame, size);
}
base_class->handle_frame =
class_init() の中で、 base_class->handle_frame を設定します
gst_text_parse_handle_frame()
handle_frame() は、 GstBaseParseGstBaseParseFrame 、そして int を貰い、 GstFlowRetrun を返す関数です
gst_base_parse_finish_frame()
finish_frame() を使って、ダウンストリームにデーターを流します。 finish_frame() は、 frame->buffer のうち「何バイトダウンストリームに流すか」という情報を第3引数の size で受けとります。そのために、 gst_buffer_get_size() を使ってバッファーのサイズを取得しています。とりあえず frame->buffer すべてをダウンストリームに流したいので、 get_size() で得たサイズをそのまま finish_frame() に渡します

3.2 Src Pad の Caps を決定

ソースパッドの Caps が Fix されていないと、実行時に Baseクラスに怒られます。

$ GST_PLUGIN_PATH=. gst-launch-1.0 filesrc location=/tmp/a.txt ! textparse ! filesink location=/tmp/b.txt
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
ERROR: from element /GstPipeline:pipeline0/GstTextParse:textparse0: No caps set
Additional debug info:
gstbaseparse.c(2606): gst_base_parse_push_frame (): /GstPipeline:pipeline0/GstTextParse:textparse0
ERROR: pipeline doesn't want to preroll.
Setting pipeline to NULL ...
Freeing pipeline ...

GstBaseParse には、「 finish_frame() を呼ばれる前に、 Sink pad の Caps が Fixed されていなければならない」というルールがあるようです。これは 「GstBaseParse がそのように実装されている」というだけで GStreamer の制限ではありません。でも考えてみれば、 Parser なのだからダウンストリームに流すデーターは固定されているべきですよね。

ドキュメントには「Fixate the source pad caps when appropriate」と書いてあります。「when appropriate (適切な時に)」って、いつ(?)、どこ(?)なんでしょう? 作っているパーサーによって異なるのでドキュメントでは明言されていません。 gst_base_parse_finish_frame() の前には決定していなければいけないので、 handle_frame() の中でとりあえずやってしまいます。

static GstFlowReturn
gst_text_parse_handle_frame (GstBaseParse * parse,
                             GstBaseParseFrame * frame,
                             G_GNUC_UNUSED gint * skipsize)
{
        gint size;
        GstPad *srcpad = gst_element_get_static_pad(GST_ELEMENT(parse), "src");
        if (!gst_pad_has_current_caps(srcpad))
                gst_pad_set_caps(srcpad, gst_caps_new_empty_simple ("text/x-raw"));

        size = gst_buffer_get_size(frame->buffer);

        return gst_base_parse_finish_frame(parse, frame, size);
}

やっと、パイプラインが動作するようになりました。この状態で cp と同じように filesrc で指定したファイルを filesink に指定したファイルにコピーすることができます。

$ GST_DEBUG_NO_COLOR=1 GST_PLUGIN_PATH=. gst-launch-1.0 filesrc location=/tmp/a.txt ! textparse ! filesink location=/tmp/b.txt
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
Pipeline is PREROLLED ...
Setting pipeline to PLAYING ...
New clock: GstSystemClock
Got EOS from element "pipeline0".
Execution ended after 0:00:00.000257185
Setting pipeline to PAUSED ...
Setting pipeline to READY ...
Setting pipeline to NULL ...
Freeing pipeline ...
$ diff -u /tmp/a.txt /tmp/b.txt
$

4 Finding a new line

さて、次は本題の「改行毎に区切ってダウンストリームに流す」部分を作ります。 handle_frame() に渡ってきた frame には、バッファーが付いてきています。 frame->buffer の中から、改行文字列を探しだし、一行分のデーター長を gst_base_parse_finish_frame() に教えてあげれば、完成です。難しいことは GstBaseParse クラスがやってくれるのでとても簡単です。

GstBuffer の中を触るには、 gst_buffer_map()gst_buffer_unmap() を使うというのは、前回 やった通りです。

GstBuffer が持っているデーターの中から '\n' を探し出します。文字列の中からある文字を探し出す C の関数は strstr() です。今回もこの関数を使うことができますが、GstBuffer には文字の終端を表わす NUL 文字 (\0)も入っているかもしれません。今回は、NUL 文字の事も考えて memchr() を使います。

static GstFlowReturn
gst_text_parse_handle_frame (GstBaseParse * parse,
                             GstBaseParseFrame * frame,
                             G_GNUC_UNUSED gint * skipsize)
{
        GstPad *srcpad;
        ptrdiff_t len;
        guint8 *pos;
        GstMapInfo info;

        srcpad = gst_element_get_static_pad(GST_ELEMENT(parse), "src");
        if (!gst_pad_has_current_caps(srcpad))
                gst_pad_set_caps(srcpad, gst_caps_new_empty_simple ("text/x-raw"));

        gst_buffer_map(frame->buffer, &info, GST_MAP_READ);

        pos = memchr(info.data, '\n', info.size);
        if (!pos)
                return GST_FLOW_OK;
        len = pos - info.data;

        gst_buffer_unmap(frame->buffer, &info);

        return gst_base_parse_finish_frame(parse, frame, len+1);
}
gst_buffer_map()
GstBuffer にアクセスするための情報を &info で貰う
memchr()
info.data の中に改行文字 ('\n') が含まれているか検索。含まれていたら、改行文字のアドレスを戻り値で返す
if (!pos)
貰った frame->buffer の中に改行文字を見付けられなかった場合は、 NULL が帰る。その時は GST_FLOW_OK を返せば、次のデーターと一緒にまた handle_frame() が呼ばれる仕組みになっている
len = pos - info.data;
見つかった改行文字の場所から、データーの先頭アドレスを引く事で、一行のバイト数が分る
gst_buffer_unmap()
GstBuffer へのアクセスが終ったことを伝える
gst_base_parse_finish_frame()
ダウンストリームに、一行分のデーターが入った GstBuffer を流す。

gst_base_parse_finish_frame() の第3引数で len+1 としているのは、改行も含めてダウンストリームに流したいからです。

これで、動作するはずです。実際に一行づつに分れて流れているか identity を使ってデーターを dump してみます。

$ GST_PLUGIN_PATH=. gst-launch-1.0 filesrc location=/tmp/sonnet.txt ! textparse ! identity dump=true ! filesink location=/tmp/b.txt
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
00000000 (0x7f7d40006530): 46 52 4f 4d 20 66 61 69 72 65 73 74 20 63 72 65  FROM fairest cre
00000010 (0x7f7d40006540): 61 74 75 72 65 73 20 77 65 20 64 65 73 69 72 65  atures we desire
00000020 (0x7f7d40006550): 20 69 6e 63 72 65 61 73 65 2c 0a                  increase,.
Pipeline is PREROLLED ...
Setting pipeline to PLAYING ...
New clock: GstSystemClock
00000000 (0x7f7d4000a3f0): 54 68 61 74 20 74 68 65 72 65 62 79 20 62 65 61  That thereby bea
00000010 (0x7f7d4000a400): 75 74 79 27 73 20 72 6f 73 65 20 6d 69 67 68 74  uty's rose might
00000020 (0x7f7d4000a410): 20 6e 65 76 65 72 20 64 69 65 2c 0a               never die,.

00000000 (0x7f7d4000c3e0): 42 75 74 20 61 73 20 74 68 65 20 72 69 70 65 72  But as the riper
00000010 (0x7f7d4000c3f0): 20 73 68 6f 75 6c 64 20 62 79 20 74 69 6d 65 20   should by time
00000020 (0x7f7d4000c400): 64 65 63 65 61 73 65 2c 0a                       decease,.

00000000 (0x7f7d4000e350): 48 69 73 20 74 65 6e 64 65 72 20 68 65 69 72 20  His tender heir
00000010 (0x7f7d4000e360): 6d 69 67 68 74 20 62 65 61 72 20 68 69 73 20 6d  might bear his m
00000020 (0x7f7d4000e370): 65 6d 6f 72 79 3a 0a                             emory:.

00000000 (0x7f7d400102f0): 42 75 74 20 74 68 6f 75 2c 20 63 6f 6e 74 72 61  But thou, contra
00000010 (0x7f7d40010300): 63 74 65 64 20 74 6f 20 74 68 69 6e 65 20 6f 77  cted to thine ow
00000020 (0x7f7d40010310): 6e 20 62 72 69 67 68 74 20 65 79 65 73 2c 0a     n bright eyes,.

00000000 (0x7f7d400123a0): 46 65 65 64 27 73 74 20 74 68 79 20 6c 69 67 68  Feed'st thy ligh
00000010 (0x7f7d400123b0): 74 27 73 74 20 66 6c 61 6d 65 20 77 69 74 68 20  t'st flame with
00000020 (0x7f7d400123c0): 73 65 6c 66 2d 73 75 62 73 74 61 6e 74 69 61 6c  self-substantial
00000030 (0x7f7d400123d0): 20 66 75 65 6c 2c 0a                              fuel,.

00000000 (0x7f7d400142a0): 4d 61 6b 69 6e 67 20 61 20 66 61 6d 69 6e 65 20  Making a famine
00000010 (0x7f7d400142b0): 77 68 65 72 65 20 61 62 75 6e 64 61 6e 63 65 20  where abundance
00000020 (0x7f7d400142c0): 6c 69 65 73 2c 0a                                lies,.

00000000 (0x7f7d40015350): 54 68 79 73 65 6c 66 20 74 68 79 20 66 6f 65 2c  Thyself thy foe,
00000010 (0x7f7d40015360): 20 74 6f 20 74 68 79 20 73 77 65 65 74 20 73 65   to thy sweet se
00000020 (0x7f7d40015370): 6c 66 20 74 6f 6f 20 63 72 75 65 6c 2e 0a        lf too cruel..

00000000 (0x7f7d40016340): 54 68 6f 75 20 74 68 61 74 20 61 72 74 20 6e 6f  Thou that art no
00000010 (0x7f7d40016350): 77 20 74 68 65 20 77 6f 72 6c 64 27 73 20 66 72  w the world's fr
00000020 (0x7f7d40016360): 65 73 68 20 6f 72 6e 61 6d 65 6e 74 0a           esh ornament.

00000000 (0x7f7d40017200): 41 6e 64 20 6f 6e 6c 79 20 68 65 72 61 6c 64 20  And only herald
00000010 (0x7f7d40017210): 74 6f 20 74 68 65 20 67 61 75 64 79 20 73 70 72  to the gaudy spr
00000020 (0x7f7d40017220): 69 6e 67 2c 0a                                   ing,.

00000000 (0x7f7d400181e0): 57 69 74 68 69 6e 20 74 68 69 6e 65 20 6f 77 6e  Within thine own
00000010 (0x7f7d400181f0): 20 62 75 64 20 62 75 72 69 65 73 74 20 74 68 79   bud buriest thy
00000020 (0x7f7d40018200): 20 63 6f 6e 74 65 6e 74 0a                        content.

00000000 (0x563a714e0750): 41 6e 64 2c 20 74 65 6e 64 65 72 20 63 68 75 72  And, tender chur
00000010 (0x563a714e0760): 6c 2c 20 6d 61 6b 65 73 74 20 77 61 73 74 65 20  l, makest waste
00000020 (0x563a714e0770): 69 6e 20 6e 69 67 67 61 72 64 69 6e 67 2e 0a     in niggarding..

00000000 (0x7f7d40019190): 50 69 74 79 20 74 68 65 20 77 6f 72 6c 64 2c 20  Pity the world,
00000010 (0x7f7d400191a0): 6f 72 20 65 6c 73 65 20 74 68 69 73 20 67 6c 75  or else this glu
00000020 (0x7f7d400191b0): 74 74 6f 6e 20 62 65 2c 0a                       tton be,.

00000000 (0x563a71423760): 54 6f 20 65 61 74 20 74 68 65 20 77 6f 72 6c 64  To eat the world
00000010 (0x563a71423770): 27 73 20 64 75 65 2c 20 62 79 20 74 68 65 20 67  's due, by the g
00000020 (0x563a71423780): 72 61 76 65 20 61 6e 64 20 74 68 65 65 2e 0a     rave and thee..
Got EOS from element "pipeline0".
Execution ended after 0:00:00.000613045
Setting pipeline to PAUSED ...
Setting pipeline to READY ...
Setting pipeline to NULL ...
Freeing pipeline ...

分りやすいように、オフセット 0 の所で改行を入れてみました。ちゃんと GstBuffer には一行づつのデーターが入っているようです。

5 さいごに

どうだったでしょうか? 結局新しく実装したのは、 memchr() まわりだけです。これだけのコード量で簡単な Parser なら実装できてしまいます。

pos = memchr(info.data, '\n', info.size);
if (!pos)
        return GST_FLOW_OK;
len = pos - info.data;

しかし、残念なことにこのコードには1つだけバグがあります。入力ファイルが改行で終っていない場合には、最後の行だけ処理されません。ファイルの最後の行が改行で終っていない場合にも、最後の行を down stream に流すように改造するのは、みなさんへの宿題としたいと思います。

それでは、 Merry Christmas & Happy Coding! ;-)

No comments:

Post a Comment