Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ Required properties are in bold.
| es.client.transport.nodes_sampler_interval | 5s | How often to sample / ping the nodes listed and connected |
| es.index | default | Index name to be used to store the documents |
| es.type | default | Type to be used to store the documents |
| es.index.builder |com.cognitree.<br>flume.sink.<br>elasticsearch.<br>StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface |
| es.index.builder |com.cognitree.<br>flume.sink.<br>elasticsearch.<br>StaticIndexBuilder | Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface|
| es.serializer |com.cognitree.<br>flume.sink.<br>elasticsearch.<br>SimpleSerializer | Implementation of com.cognitree.flume.sink.elasticsearch.Serializer interface |
| es.serializer.csv.fields | - | Comma separated csv field name with data type i.e. column1:type1,column2:type2, Supported data types are string, boolean, int and float |
| es.serializer.csv.delimiter | ,(comma) | Delimiter for the data in flume event body|
| es.serializer.avro.schema.file | - | Absolute path for the schema configuration file |

Example of agent named agent
Example of agent named agent:

````
agent.channels = es_channel
Expand Down Expand Up @@ -74,3 +74,17 @@ Example of agent named agent
agent.sinks.es_sink.es.serializer.csv.delimiter=,
agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc
````

#### Available index builders

##### com.cognitree.flume.sink.elasticsearch.HeaderBasedIndexBuilder

This builder doesn't have configurable parameters. You need to put FlumeEvent headers header called 'index' to customize target index name (default index name: 'default') and 'type' to customise target document type (default document type: 'default').

##### com.cognitree.flume.sink.elasticsearch.TimestampBasedIndexBuilder
This builder uses *timestamp* or *@timestamp* header, which expected to be a unix timestamp in milliseconds, to build index name, e.g. you can create names like: _my-awesome-index-2019-01-01_.

| Property Name | Default | Description |
|--------------------------------------------|--------------|:----------------------------------------------------------------------------------------------|
| es.index.builder.date.format | - | Sets a format of date postfix you want to have. Supports ISO 8601 standart. If it's not set - no date postfix will be created|
| es.index.builder.date.timeZone | UTC | Sets a timezone which will be used to parse the timestamp (uses _TimeZone.getTimeZone()_, so it supports formats like: 'UTC', 'UTC+03:00', 'Europe/Samara' and etc. <br> Ignored if *date.format* is not set.|
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public class Constants {

public static final String ES_INDEX_BUILDER = "es.index.builder";
public static final String DEFAULT_ES_INDEX_BUILDER = "com.cognitree.flume.sink.elasticsearch.StaticIndexBuilder";
public static final String ES_INDEX_BUILDER_DATE_FORMAT = "es.index.builder.date.format";
public static final String ES_INDEX_BUILDER_DATE_TIME_ZONE = "es.index.builder.date.timeZone";
public static final String DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE = "UTC";

public static final String ES_SERIALIZER = "es.serializer";
public static final String DEFAULT_ES_SERIALIZER = "com.cognitree.flume.sink.elasticsearch.SimpleSerializer";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2017 Cognitree Technologies
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.cognitree.flume.sink.elasticsearch;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.formatter.output.BucketPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.TimeZone;

import static com.cognitree.flume.sink.elasticsearch.Constants.*;

public class TimestampBasedIndexBuilder implements IndexBuilder {

private static final Logger logger = LoggerFactory.getLogger(TimestampBasedIndexBuilder.class);

private String index;
private String type;
private String dateFormat;
private String dateTimeZone;

private FastDateFormat fastDateFormat = null;

@Override
public String getIndex(Event event) {
TimestampedEvent timestampedEvent = new TimestampedEvent(event);
long timestamp = timestampedEvent.getTimestamp();

String indexName = index;
if (indexName == null) {
indexName = DEFAULT_ES_INDEX;
}
indexName = BucketPath.escapeString(indexName, event.getHeaders());

String timestampSuffix = "";
if (fastDateFormat != null) {
timestampSuffix = new StringBuilder(fastDateFormat.format(timestamp)).insert(0, '-').toString();
}

return new StringBuilder(indexName).append(timestampSuffix).toString();
}

@Override
public String getType(Event event) {
String type;
if (this.type != null) {
type = this.type;
} else {
type = DEFAULT_ES_TYPE;
}
return type;
}

@Override
public String getId(Event event) {
return null;
}

@Override
public void configure(Context context) {
this.index = Util.getContextValue(context, ES_INDEX);
if (this.index == null) {
this.index = DEFAULT_ES_INDEX;
}
this.type = Util.getContextValue(context, ES_TYPE);
this.dateFormat = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_FORMAT);
this.dateTimeZone = Util.getContextValue(context, ES_INDEX_BUILDER_DATE_TIME_ZONE);

if (this.dateFormat != null) {
if (this.dateTimeZone == null) {
this.dateTimeZone = DEFAULT_ES_INDEX_BUILDER_DATE_TIME_ZONE;
}

fastDateFormat = FastDateFormat.getInstance(dateFormat, TimeZone.getTimeZone(dateTimeZone));
}
logger.info("Simple Index builder, name [{}] type [{}] date format [{}] date time zone [{}] ",
new Object[]{this.index, this.type, this.dateFormat, this.dateTimeZone});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2017 Cognitree Technologies
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.cognitree.flume.sink.elasticsearch;

import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.joda.time.DateTimeUtils;

import java.util.Map;

final public class TimestampedEvent extends SimpleEvent {
private final long timestamp;

TimestampedEvent(Event base) {
setBody(base.getBody());
Map<String, String> headers = Maps.newHashMap(base.getHeaders());
String timestampString = headers.get("timestamp");
if (StringUtils.isBlank(timestampString)) {
timestampString = headers.get("@timestamp");
}
if (StringUtils.isBlank(timestampString)) {
this.timestamp = DateTimeUtils.currentTimeMillis();
headers.put("timestamp", String.valueOf(timestamp ));
} else {
this.timestamp = Long.valueOf(timestampString);
}
setHeaders(headers);
}

long getTimestamp() {
return timestamp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.cognitree.flume.sink.elasticsearch;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static com.cognitree.flume.sink.elasticsearch.Constants.*;
import static org.junit.Assert.*;

public class TestTimestampBasedIndexBuilder {

private TimestampBasedIndexBuilder timestampBasedIndexBuilder;
private String index = "es-index";
private String type = "es-type";
private String dateFormat = "yyyy-MM-dd-HH";


@Before
public void init() throws Exception {
timestampBasedIndexBuilder = new TimestampBasedIndexBuilder();
}

@Test
public void testDefaultIndex() {
Event event = new SimpleEvent();
assertEquals(DEFAULT_ES_INDEX, timestampBasedIndexBuilder.getIndex(event));
assertEquals(DEFAULT_ES_TYPE, timestampBasedIndexBuilder.getType(event));
}

@Test
public void testTimestampedIndex() {
Event event = new SimpleEvent();

Context context = new Context();

context.put(ES_INDEX, index);
context.put(ES_TYPE, type);
context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat);

Map<String, String> headers = new HashMap<String, String>();
headers.put("timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC
event.setHeaders(headers);

timestampBasedIndexBuilder.configure(context);

String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-13").toString();

assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event));
}

@Test
public void testTimestampedWithTZIndex() {
Event event = new SimpleEvent();

Context context = new Context();

context.put(ES_INDEX, index);
context.put(ES_TYPE, type);
context.put(ES_INDEX_BUILDER_DATE_FORMAT, dateFormat);
context.put(ES_INDEX_BUILDER_DATE_TIME_ZONE, "Europe/Moscow"); // UTC+3

Map<String, String> headers = new HashMap<String, String>();
headers.put("@timestamp", "1546350162000"); // 2019-01-01 13:42:42.000 UTC
event.setHeaders(headers);

timestampBasedIndexBuilder.configure(context);

String expectedIndexName = new StringBuilder(index).append('-').append("2019-01-01-16").toString();

assertEquals(expectedIndexName, timestampBasedIndexBuilder.getIndex(event));
}

@Test
public void testConfigurationIndex() {
Event event = new SimpleEvent();
Context context = new Context();

context.put(ES_INDEX, index);
context.put(ES_TYPE, type);

timestampBasedIndexBuilder.configure(context);

assertEquals(index, timestampBasedIndexBuilder.getIndex(event));
assertEquals(type, timestampBasedIndexBuilder.getType(event));
}
}