question

Ariel G. avatar image
Ariel G. asked ·

Char encoding issue between Real Time BT Feed and Flume

Hi everyone,

I'm installing a Flume instance to receive data from the dyntrace server via Real Time Business Transactions Feed, as a bridge to send it to elasticsearch.

I managed storing BTs in ES, but the messages are full of unreadable characters. Examples can bee found at the Flume log files, like the following:

21 abr 2017 10:50:08,509 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.client.DefaultRequestDirector.tryExecute:683)  - Attempt 1 to execute request
21 abr 2017 10:50:08,510 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:268)  - Sending request: POST /_bulk HTTP/1.1
21 abr 2017 10:50:08,511 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "POST /_bulk HTTP/1.1[\r][\n]"
21 abr 2017 10:50:08,512 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "Content-Length: 757[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "Content-Type: text/plain; charset=ISO-8859-1[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "Host: <host_key>.eu-west-1.es.amazonaws.com:80[\r][\n]"
21 abr 2017 10:50:08,513 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "Connection: Keep-Alive[\r][\n]"
21 abr 2017 10:50:08,514 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "User-Agent: Apache-HttpClient/4.5 (Java/1.8.0_102)[\r][\n]"
21 abr 2017 10:50:08,514 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "[\r][\n]"
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:272)  - >> POST /_bulk HTTP/1.1
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275)  - >> Content-Length: 757
21 abr 2017 10:50:08,515 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275)  - >> Content-Type: text/plain; charset=ISO-8859-1
21 abr 2017 10:50:08,516 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275)  - >> Host: <host_key>.eu-west-1.es.amazonaws.com:80
21 abr 2017 10:50:08,516 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275)  - >> Connection: Keep-Alive
21 abr 2017 10:50:08,517 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.DefaultClientConnection.sendRequestHeader:275)  - >> User-Agent: Apache-HttpClient/4.5 (Java/1.8.0_102)
21 abr 2017 10:50:08,517 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "{"index":{"_index":"pp-2017-04-21","_type":"item"}}[\n]"
21 abr 2017 10:50:08,518 DEBUG [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.http.impl.conn.Wire.wire:72)  -  >> "{"@message":"\n!Motor Hotel Valoracion Reserva PP\u0010\u0000\u001A\u000Efront terceras*\u001FMotor Hotel Prebook - Proveedor*\u001DMotor Hotel Prebook - Destino2h\b?????+*!PT=78408;PA=1461627692;PS=3000787:\u0003PAV:\u0003QXUy\u0000\u0000\u0000?F??@?\u0001`u???\u0019b@?\u0001\u0000\u0000\u0000??:@?\u0001\u0000?????@?\u0001\u0000\u0000\u0000?F??@?\u0001\u00002i\b?????+*!PT=78431;PA=1461627692;PS=3000787:\u0003BAR:\u00046679y\u0000\u0000\u0000?d ?@?\u0001fg?s??\\@?\u0001\u0000\u0000`}???@?\u0001\u0000\u0000?p?\b?@?\u0001\u0000\u0000\u0000\u0000tL?@?\u0001\u0000:\nProduction","@fields":{"btType":"PUREPATH","server":"apm1","btName":"Motor Hotel Valoracion Reserva PP","systemProfile":"Production"}}[\n]"

I would like to know if someone else has dealt with this issue.

This is the Flume configuration (based on Big Data Business Transaction Bridge):

# Name the components on this agent
agent1.sources = HTTPSource
agent1.sinks = PurePathSink UserActionSink VisitSink
agent1.channels = PurePathChannel UserActionChannel VisitChannel

# Describe/configure HTTPSource
agent1.sources.HTTPSource.type = org.apache.flume.source.http.HTTPSource
agent1.sources.HTTPSource.port = 4321
agent1.sources.HTTPSource.handler = com.dynatrace.diagnostics.btexport.flume.BtExportHandler

# Describe sinks
agent1.sinks.PurePathSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.PurePathSink.client = rest
agent1.sinks.PurePathSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.PurePathSink.indexName = pp
agent1.sinks.PurePathSink.indexType = item
agent1.sinks.PurePathSink.clusterName = <cluster>
agent1.sinks.PurePathSink.batchSize = 100
agent1.sinks.PurePathSink.serializer.charset = ISO-8859-1
agent1.sinks.PurePathSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtPurePathSerializerBuilder
agent1.sinks.PurePathSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

agent1.sinks.UserActionSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.UserActionSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.UserActionSink.client  = rest
agent1.sinks.UserActionSink.indexName = ua
agent1.sinks.UserActionSink.indexType = item
agent1.sinks.UserActionSink.clusterName = <cluster>
agent1.sinks.UserActionSink.batchSize = 100
agent1.sinks.UserActionSink.serializer.charset = ISO-8859-1
agent1.sinks.UserActionSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtPageActionSerializerBuilder
agent1.sinks.UserActionSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

agent1.sinks.VisitSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.VisitSink.hostNames = <host_key>.eu-west-1.es.amazonaws.com:80
agent1.sinks.VisitSink.client = rest
agent1.sinks.VisitSink.indexName = vt
agent1.sinks.VisitSink.indexType = item
agent1.sinks.VisitSink.clusterName = <cluster>
agent1.sinks.VisitSink.batchSize = 100
agent1.sinks.VisitSink.serializer.charset = ISO-8859-1
agent1.sinks.VisitSink.serializerBuilder = com.dynatrace.diagnostics.btexport.flume.BtVisitSerializerBuilder
agent1.sinks.VisitSink.indexNameBuilder = org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder

# Use a channel which buffers events in memory
agent1.channels.PurePathChannel.type = memory
agent1.channels.PurePathChannel.capacity = 1000
agent1.channels.PurePathChannel.transactionCapactiy = 100

agent1.channels.UserActionChannel.type = memory
agent1.channels.UserActionChannel.capacity = 1000
agent1.channels.UserActionChannel.transactionCapactiy = 100

agent1.channels.VisitChannel.type = memory
agent1.channels.VisitChannel.capacity = 1000
agent1.channels.VisitChannel.transactionCapactiy = 100

# Bind the source and sink to the channel
agent1.sources.HTTPSource.channels = PurePathChannel UserActionChannel VisitChannel
agent1.sinks.PurePathSink.channel = PurePathChannel
agent1.sinks.UserActionSink.channel = UserActionChannel
agent1.sinks.VisitSink.channel = VisitChannel

agent1.sources.HTTPSource.selector.type = multiplexing
agent1.sources.HTTPSource.selector.header = btType
agent1.sources.HTTPSource.selector.mapping.PUREPATH = PurePathChannel
agent1.sources.HTTPSource.selector.mapping.PAGE_ACTION = UserActionChannel
agent1.sources.HTTPSource.selector.mapping.USER_ACTION = UserActionChannel
agent1.sources.HTTPSource.selector.mapping.VISIT = VisitChannel
agent1.sources.HTTPSource.selector.default = PurePathChannel

Thanks.

configuration6.5business transactionjava
10 |2000000 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 50.0 MiB each and 250.0 MiB total.

1 Answer

Ariel G. avatar image
Ariel G. answered ·

Issue solved.

Config entry "agent1.sinks.PurePathSink.serializerBuilder" does not work, and makes the default serializer do the job, which means sendind to ES the request data as is.

Request data is formatted in a way that is parseable by com.dynatrace.diagnostics.core.realtime.export.BtExport.BusinessTransaction (available at Big Data Business Transaction Bridge). However, the serializer does not work to send the data to ES.

After understanding what was happening, we have written a new serializer that converts the request into JSON format (gracefully made by BtExport.BusinessTransaction), and then reads the JSON to build the ES message.

The goal of this solution is to send Business transactions to ES through flume, without any other layer.

A piece of the solution is:

public class BTSerializer implements ElasticSearchEventSerializer {

	@Override
	public void configure(Context arg0) {
	}

	@Override
	public void configure(ComponentConfiguration arg0) {
	}

	@Override
	public BytesStream getContentBuilder(Event event) throws IOException {
		// Parse request data into JSON
		BusinessTransaction parsedBt = BusinessTransaction.parseFrom(event.getBody());

		XContentBuilder builder = XContentFactory.jsonBuilder().startObject();

		// Build ES body message: the full content
		builder.field("@message", parsedBt.getAllFields().toString());

		// Build ES keys
		appendIndexes(builder.startObject("@fields"), event, parsedBt).endObject();
		builder.endObject();

		return builder; 
	}

	private XContentBuilder appendIndexes(XContentBuilder content, Event event, BusinessTransaction parsedBt) throws IOException { 

		// All the headers will be keys in ES
		Map<String, String> headers = event.getHeaders(); 
		for (String key : headers.keySet()) { 
			content.field(key, headers.get(key));
		}

		try {
			for (Map.Entry<FieldDescriptor,Object> entry : parsedBt.getAllFields().entrySet()) {
				FieldDescriptor field = entry.getKey();
				Object value = entry.getValue();
				String fieldName = field.getName();

				// ApplicationName will be a ES-key
				if ("application".equals(fieldName)) {
					content.field(fieldName, value.toString());
				} else if ("occurrences".equals(fieldName)) {
					
					// It comes here as maximum the number of messages set as the Bulk Size. Must be set to 1 (improve pending)
					@SuppressWarnings("unchecked")
					List<com.google.protobuf.GeneratedMessage> listOfMsg = (List<com.google.protobuf.GeneratedMessage>) value;


					for (com.google.protobuf.GeneratedMessage singleMsg: listOfMsg) {


						// Desired BT fields might be managed here
						for (Map.Entry<FieldDescriptor,Object> fieldEntry : singleMsg.getAllFields().entrySet()) {
							FieldDescriptor afield = fieldEntry.getKey();
							Object avalue = fieldEntry.getValue();
							String afieldName = afield.getName();


							// StartTime will be the ES-timestamp-key
							if ("startTime".equals(afieldName)) {
								Date dt = new Date((Long) avalue);
								DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
								df.setTimeZone(TimeZone.getDefault());
								String date = df.format(dt);
								content.field("timestamp", date);
							}
						}
					}
				}
			} 
		} catch( Exception e) {
			log.error("Something wrong indexing ES message: " + e.getMessage());
		}


		return content;
	}


}

We are already working in it.

Share
10 |2000000 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 50.0 MiB each and 250.0 MiB total.