syslog-ng-users April 2011 archive
Main Archive Page > Month Archives  > syslog-ng-users archives
syslog-ng-users: [syslog-ng] [PATCH 2/3] afmongodb: Port to valu

[syslog-ng] [PATCH 2/3] afmongodb: Port to value-pairs().

From: Gergely Nagy <algernon_at_nospam>
Date: Sun Apr 24 2011 - 09:29:52 GMT
To: syslog-ng@lists.balabit.hu

Ported from custom, explicit key/value pairs to the far more flexible
value-pairs() solution. By default, the driver uses a custom scope:
selected_macros and nv_pairs, with a few patterns excluded.

The patch also turns the collection() parameter into a plain string
(from a templatable string), to considerably reduce the work needed to
insert messages into the database.

Signed-off-by: Gergely Nagy <algernon@balabit.hu>
--- modules/afmongodb/afmongodb-grammar.ym | 4 +- modules/afmongodb/afmongodb-parser.c | 2 - modules/afmongodb/afmongodb.c | 154 +++++++++---------------------- modules/afmongodb/afmongodb.h | 4 +- 4 files changed, 48 insertions(+), 116 deletions(-) diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym index a19e2b4..7b5e680 100644 --- a/modules/afmongodb/afmongodb-grammar.ym +++ b/modules/afmongodb/afmongodb-grammar.ym @@ -48,7 +48,6 @@ extern LogDriver *last_driver; %token KW_MONGODB %token KW_COLLECTION -%token KW_KEYS %% @@ -70,10 +69,9 @@ afmongodb_option | KW_PORT '(' LL_NUMBER ')' { afmongodb_dd_set_port(last_driver, $3); } | KW_DATABASE '(' string ')' { afmongodb_dd_set_database(last_driver, $3); free($3); } | KW_COLLECTION '(' string ')' { afmongodb_dd_set_collection(last_driver, $3); free($3); } - | KW_VALUES '(' string_list ')' { afmongodb_dd_set_values(last_driver, $3); } - | KW_KEYS '(' string_list ')' { afmongodb_dd_set_keys(last_driver, $3); } | KW_USERNAME '(' string ')' { afmongodb_dd_set_user(last_driver, $3); free($3); } | KW_PASSWORD '(' string ')' { afmongodb_dd_set_password(last_driver, $3); free($3); } + | value_pair_stmt { afmongodb_dd_set_value_pairs(last_driver, last_value_pairs); } | dest_driver_option ; diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c index 5f9161e..67a9b0b 100644 --- a/modules/afmongodb/afmongodb-parser.c +++ b/modules/afmongodb/afmongodb-parser.c @@ -34,8 +34,6 @@ static CfgLexerKeyword afmongodb_keywords[] = { { "port", KW_PORT }, { "database", KW_DATABASE }, { "collection", KW_COLLECTION }, - { "keys", KW_KEYS }, - { "values", KW_VALUES }, { "username", KW_USERNAME }, { "password", KW_PASSWORD }, { "log_fifo_size", KW_LOG_FIFO_SIZE }, diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 284066a..94f0ce1 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -47,7 +47,7 @@ typedef struct /* Shared between main/writer; only read by the writer, never written */ gchar *db; - LogTemplate *coll; + gchar *coll; gchar *host; gint port; @@ -55,12 +55,6 @@ typedef struct gchar *user; gchar *password; - GList *keys; - GList *values; - - gint num_fields; - MongoDBField *fields; - time_t time_reopen; guint32 *dropped_messages; @@ -68,6 +62,8 @@ typedef struct time_t last_msg_stamp; + ValuePairs *vp; + /* Thread related stuff; shared */ GThread *writer_thread; GMutex *queue_mutex; @@ -84,11 +80,9 @@ typedef struct mongo_connection *conn; gint32 seq_num; - GString *current_namespace; - gint ns_prefix_len; + gchar *ns; GString *current_value; - bson *bson_sel, *bson_upd, *bson_set; } MongoDBDestDriver; @@ -145,26 +139,17 @@ afmongodb_dd_set_collection(LogDriver *d, const gchar *collection) { MongoDBDestDriver *self = (MongoDBDestDriver *)d; - log_template_unref(self->coll); - self->coll = log_template_new(log_pipe_get_config(&d->super), NULL, collection); + g_free(self->coll); + self->coll = g_strdup(collection); } void -afmongodb_dd_set_keys(LogDriver *d, GList *keys) +afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp) { MongoDBDestDriver *self = (MongoDBDestDriver *)d; - string_list_free(self->keys); - self->keys = keys; -} - -void -afmongodb_dd_set_values(LogDriver *d, GList *values) -{ - MongoDBDestDriver *self = (MongoDBDestDriver *)d; - - string_list_free(self->values); - self->values = values; + value_pairs_free (self->vp); + self->vp = vp; } /* @@ -177,7 +162,7 @@ afmongodb_dd_format_stats_instance(MongoDBDestDriver *self) static gchar persist_name[1024]; g_snprintf(persist_name, sizeof(persist_name), - "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll->template); + "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll); return persist_name; } @@ -187,7 +172,7 @@ afmongodb_dd_format_persist_name(MongoDBDestDriver *self) static gchar persist_name[1024]; g_snprintf(persist_name, sizeof(persist_name), - "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll->template); + "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll); return persist_name; } @@ -244,11 +229,23 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) /* * Worker thread */ +static gboolean +afmongodb_vp_foreach (const gchar *name, const gchar *value, + gpointer user_data) +{ + bson *bson_set = (bson *)user_data; + + if (name[0] == '.') + bson_append_string (bson_set, name + 1, value, -1); + else + bson_append_string (bson_set, name, value, -1); + + return FALSE; +} static gboolean afmongodb_worker_insert (MongoDBDestDriver *self) { - gint i; gboolean success; mongo_packet *p; guint8 *oid; @@ -274,25 +271,15 @@ afmongodb_worker_insert (MongoDBDestDriver *self) g_free (oid); bson_finish (self->bson_sel); - g_string_truncate(self->current_namespace, self->ns_prefix_len); - log_template_append_format(self->coll, msg, NULL, LTZ_LOCAL, - self->seq_num, NULL, self->current_namespace); - - for (i = 0; i < self->num_fields; i++) - { - log_template_format(self->fields[i].value, msg, NULL, LTZ_SEND, - self->seq_num, NULL, self->current_value); - if (self->current_value->len) - bson_append_string(self->bson_set, self->fields[i].name, - self->current_value->str, -1); - } + value_pairs_foreach (self->vp, afmongodb_vp_foreach, + msg, self->seq_num, self->bson_set); bson_finish (self->bson_set); bson_append_document (self->bson_upd, "$set", self->bson_set); bson_finish (self->bson_upd); - p = mongo_wire_cmd_update (1, self->current_namespace->str, 1, + p = mongo_wire_cmd_update (1, self->ns, 1, self->bson_sel, self->bson_upd); if (!mongo_packet_send (self->conn, p)) @@ -335,12 +322,8 @@ afmongodb_worker_thread (gpointer arg) NULL); success = afmongodb_dd_connect(self, FALSE); - self->current_namespace = g_string_sized_new(64); - self->ns_prefix_len = strlen (self->db) + 1; - self->current_namespace = - g_string_append_c (g_string_assign (self->current_namespace, - self->db), '.'); + self->ns = g_strconcat (self->db, ".", self->coll, NULL); self->current_value = g_string_sized_new(256); @@ -383,7 +366,7 @@ afmongodb_worker_thread (gpointer arg) afmongodb_dd_disconnect(self); - g_string_free (self->current_namespace, TRUE); + g_free (self->ns); g_string_free (self->current_value, TRUE); bson_free (self->bson_sel); @@ -421,15 +404,25 @@ afmongodb_dd_init(LogPipe *s) MongoDBDestDriver *self = (MongoDBDestDriver *)s; GlobalConfig *cfg = log_pipe_get_config(s); - gint num_keys, num_values, i; - GList *key, *value; - if (!log_dest_driver_init_method(s)) return FALSE; if (cfg) self->time_reopen = cfg->time_reopen; + if (!self->vp) + { + self->vp = value_pairs_new(cfg); + value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_SELECTED_MACROS); + value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_NV_PAIRS); + value_pairs_exclude_glob_pattern(self->vp, "R_*"); + value_pairs_exclude_glob_pattern(self->vp, "S_*"); + value_pairs_exclude_glob_pattern(self->vp, "HOST_FROM"); + value_pairs_exclude_glob_pattern(self->vp, "LEGACY_MSGHDR"); + value_pairs_exclude_glob_pattern(self->vp, "MSG"); + value_pairs_exclude_glob_pattern(self->vp, "SDATA"); + } + msg_verbose("Initializing MongoDB destination", evt_tag_str("host", self->host), evt_tag_int("port", self->port), @@ -438,29 +431,6 @@ afmongodb_dd_init(LogPipe *s) self->queue = log_dest_driver_acquire_queue(&self->super, afmongodb_dd_format_persist_name(self)); - if (!self->fields) - { - num_keys = g_list_length(self->keys); - num_values = g_list_length(self->values); - - if (num_keys != num_values) - { - msg_error("The number of keys and values do not match", - evt_tag_int("num_keys", num_keys), - evt_tag_int("num_values", num_values), - NULL); - return FALSE; - } - self->num_fields = num_keys; - self->fields = g_new0(MongoDBField, num_keys); - - for (i = 0, key = self->keys, value = self->values; key && value; i++, key = key->next, value = value->next) - { - self->fields[i].name = g_strdup(key->data); - self->fields[i].value = log_template_new(cfg, NULL, (gchar *)value->data); - } - } - stats_register_counter(0, SCS_MONGODB | SCS_DESTINATION, self->super.super.id, afmongodb_dd_format_stats_instance(self), SC_TYPE_STORED, &self->stored_messages); @@ -499,7 +469,6 @@ static void afmongodb_dd_free(LogPipe *d) { MongoDBDestDriver *self = (MongoDBDestDriver *)d; - gint i; g_mutex_free(self->suspend_mutex); g_mutex_free(self->queue_mutex); @@ -508,20 +477,13 @@ afmongodb_dd_free(LogPipe *d) if (self->queue) log_queue_unref(self->queue); - for (i = 0; i < self->num_fields; i++) - { - g_free(self->fields[i].name); - log_template_unref(self->fields[i].value); - } - - g_free(self->fields); g_free(self->db); - log_template_unref(self->coll); + g_free(self->coll); g_free(self->user); g_free(self->password); g_free(self->host); - string_list_free(self->keys); - string_list_free(self->values); + + value_pairs_free(self->vp); log_dest_driver_free(d); } @@ -548,30 +510,6 @@ afmongodb_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_optio * Plugin glue. */ -const gchar *default_keys[] = -{ - "date", - "facility", - "level", - "host", - "program", - "pid", - "message", - NULL -}; - -const gchar *default_values[] = -{ - "${R_YEAR}-${R_MONTH}-${R_DAY} ${R_HOUR}:${R_MIN}:${R_SEC}", - "$FACILITY", - "$LEVEL", - "$HOST", - "$PROGRAM", - "$PID", - "$MSGONLY", - NULL -}; - LogDriver * afmongodb_dd_new(void) { @@ -589,8 +527,6 @@ afmongodb_dd_new(void) afmongodb_dd_set_port((LogDriver *)self, 27017); afmongodb_dd_set_database((LogDriver *)self, "syslog"); afmongodb_dd_set_collection((LogDriver *)self, "messages"); - afmongodb_dd_set_keys((LogDriver *)self, string_array_to_list(default_keys)); - afmongodb_dd_set_values((LogDriver *)self, string_array_to_list(default_values)); init_sequence_number(&self->seq_num); diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h index e54faf2..bd7b258 100644 --- a/modules/afmongodb/afmongodb.h +++ b/modules/afmongodb/afmongodb.h @@ -25,6 +25,7 @@ #define AFMONGODB_H_INCLUDED #include "driver.h" +#include "value-pairs.h" LogDriver *afmongodb_dd_new(void); @@ -32,9 +33,8 @@ void afmongodb_dd_set_host(LogDriver *d, const gchar *host); void afmongodb_dd_set_port(LogDriver *d, gint port); void afmongodb_dd_set_database(LogDriver *d, const gchar *database); void afmongodb_dd_set_collection(LogDriver *d, const gchar *collection); -void afmongodb_dd_set_values(LogDriver *d, GList *values); -void afmongodb_dd_set_keys(LogDriver *d, GList *keys); void afmongodb_dd_set_user(LogDriver *d, const gchar *user); void afmongodb_dd_set_password(LogDriver *d, const gchar *password); +void afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp); #endif -- 1.7.2.5 ______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.campin.net/syslog-ng/faq.html