Open Source RDBMS - Seamless, Scalable, Stable and Free

한국어 | Login |Register

Current Events
Join our developers event to win one of the valuable prizes!
posted 3 years ago
viewed 48465 times
Share this article

Log Analysis System Using Hadoop and MongoDB

I have been searching for a way to process the exponentially increasing service logs at a low cost. For the past several months, the Monitoring System Development Team here at NHN has also been looking for a good way to analyze logs. In this article, I want to tell you what we have come up with.

Every day, many developers and database administrators struggle to find ways to complete their tasks. One such task is statistics. Let's say that an operator (or a producer) asks you to retrieve some statistics data. It will be a relatively easy task if the size of the data is less than a few million rows, or if your organization has a dedicated backup database from which you can retrieve statistics.

But what if you have to retrieve a large amount of data without a backup database?

This will be a challenge for most of us. The Monitoring System Development Team has long been searching for better ways to process work involving statistics, and finally found a solution which involves using MongoDB and Hadoop. In this article, I will show you what we have done and experienced.

Introduction

The team is developing and operating monitoring systems such as xMon (these are common names of the monitoring systems used at NHN: uMon, kMon, cMon, jrMon and ocMon). The storage capacity of the Oracle database used by xMon is 20 TB. This Oracle database stores a large amount of data, such as user-generated content (UGC) metadata (the body or multimedia data are distributed and saved in OwFS and the video infrastructure) that have occurred on each NAVER service and the related log data for the past six months. The figure below shows the cycle of UGC generation and destruction that generates a log. The company has been using three Oracle DBMS systems with 128 GB memory to process 20 TB of data.

OwFS is a distributed file storage system developed by NHN . Each storage is allocated and distributed in the unit of Owner.

Naver is the major search engine service in Korea.

UGC Generation and Destruction Cycle

Figure 1. UGC Generation and Destruction Cycle.

The environment in which the Oracle DBMS was being used was getting worse as time went by. The amount of UGC was exponentially increasing. We adopted the policy of archiving only six months worth of data in the database, but the amount of data for that six months was getting bigger and bigger, creating a problem for us. Furthermore, the cost for additional Oracle DBMS storage could not be ignored. The table and figure below show the status and trend of the storage usage of xMon.

xMon Storage Usage

Classification Remaining space Space used Total space Usage
xmona 4,593 GB 3,506 GB 8,100 GB 43.29%
xmonb 2,768 GB 5,619 GB 8,387 GB 66.99%
xmonc 1,533 GB 3,018 GB 4,552 GB 66.30%

xMon Storage Usage Trend

Firgure 2: xMon Storage Usage Trend.

After experiencing some difficulties, the team had to adopt a better system than Oracle DBMS because of the following two key requirements.

  • Requirement 1: Be able to extract the user unit index by utilizing the existing log data.
  • Requirement 1-1: The index must be extracted at least once a day.
  • Requirement 2: The extracted user unit index must be immediately available online.
  • Requirement 2-1: The index extraction must be implemented as a query function in the existing system.

To satisfy the first requirement, the index, which is a form of statistical data, must be extracted from the data as soon as possible. In the company's old system, statistical data was extracted in the following way: a developer would write an SQL and send it to a database administrator; then, the administrator would verify and execute the SQL and send the result back. As you can imagine, this process was far from speedy.

Requirement 2 can be summarized as "Provide index data that can be queried by a Web browser." Under normal circumstances, meeting this requirement is trivial; but when the size of data to be processed is somewhere around the realm of 4 TB, adding the function to the existing Oracle database is a problem. Adding index data to the storage would increase the load, which was already hitting the ceiling due to the load of processing the ever-growing UGC metadata and log data.

Data Aggregation and Extraction by Utilizing Map-Reduce

The required table for index extraction is a generic log table with 30 to 60 columns. This simple schema is logically related to the master table, but there is no actual relation. With this information at hand, one could easily conclude that utilizing DBMS or the Map-Reduce method would be the answer.

In my first attempt, I extracted the index from Oracle by writing an SQL. Adding a query function to the system is not much of a problem (if performance is of no concern) when there are already dozens of PL/SQLs and aggregation tables for a variety of statistics data. I divided the date into small chunks in my SQL. This is called the divide-and-conquer approach. It was exactly a week later when I got a call from the database admin. The admin. told me that the SQL in question had been executed for 860 minutes and the CPU usage was up to 60% (30-40% is the usual). The admin stopped the process. With that, my first attempt ended. Unfortunately, my first trial was a total failure. The following is part of the SQL code that is actually used in the field to aggregate the size of unreviewed data for last 1 month.

INSERT INTO kpi_total_xxxxxx 
          (dd, td,... ) 
 SELECT /*+ index (b CONT_MSTR_CATE_PK) */ SUBSTR(v_start_datehour, 1, 8)  dd, a.td,...,
     SUM(cnt) as cnt,...  FROM (
          SELECT /*+ parallel_index (cm 3) index_ss (cm CONT_MSTR_IDX01) */
                       TO_CHAR(umon_in_tdt, 'YYYYMMDD') td, COUNT(1) as cnt FROM cont_mstr cm
          WHERE  ... AND cm.umon_in_tdt >= TO_DATE(TO_CHAR(
                       ADD_MONTHS(SYSDATE, -1), 'YYYYMM')||'01000000', 'YYYYMMDDHH24MISS')
          GROUP BY TO_DATE(cm.umon_in_tdt, 'YYYYMMDD'), cm.category_id
     ) a, cont_mstr_cate b
  WHERE a.service_code = b.service_code ...  GROUP  BY a.td;

So, I picked Map-Reduce as an alternative. Map-Reduce works in the same way as the Sort/Merge functions of RBDMS. Map-Reduce is suitable for a batch data analysis for OLAP (On-line Analytical Processing), which tends to deal with a large amount of data. Most people would think of Hadoop when I say Map-Reduce, but other DBs such as MongoDB, HBase, CouchDB and Hypertable offer Map-Reduce as well. Among these, I chose two DBs to test and verify: Hadoop for its time-proven performance, and MongoDB for its document-orientedness, which makes saving logs in a sequence an easy task.

I extracted the data first because I couldn't access the Oracle if I were to verify the Map-Reduce of Hadoop. I used the Kettle of Pentaho, an open-source ETL (Extract, Transform, Load) solution, to extract the data stored in Oracle. For testing purposes, I used HDFS and Map-Reduce only, two of the many features of Hadoop.

I configured three different environments for testing. For Hadoop, I configured the standalone operation and the fully-distributed operation using three DataNodes and one NameNode. Then, I compared the resulting performance with the Map-Reduce of MongoDB.

For your information, the system specifications of the testing machines (each machine is identical) are as follows:

  • CPU: Intel(R) Xeon(R) CPU L5420@2.50GHz * 2 (8 cores)
  • Memory: 32 GB
  • Disk: 814 GB (300 GB * 6 ea, RAID 0 + 1)
  • OS: CentOS 5.2 x86_64

I used one server for the standalone operation of Hadoop, 4 for the fully-distributed operation, and 5 for MongoDB. I separated the name node and the data node in the fully-distributed operation of Hadoop. This separation was necessary because the Job Tracker that executes Reduce task and the main controller run on the name node, while the Task Tracker that actually executes the Map task runs on the data node. MongoDB provides functions that are similar to Map-Reduce of Hadoop. I will explain more about MongoDB later.

The figure below shows the cluster configuration of Hadoop, including the data node and the name node. See Figure 6 below for the shard configuration of MongoDB.

Hadoop Cluster Configuration

Figure 3: Hadoop Cluster Configuration.

As shown in the table and figure below, standalone operation is suitable for processing small data, while fully-distributed operation is good for processing large data. Processing small data with the fully-distributed operation is undesirable because the time it takes to collect distributed data in the same nodes during a Reduce operation outweighs the advantages of distributing data to each node using Map. In this test, the fully-distributed operation of 10 million rows of data outperformed other test conditions. The test result varies depending on the amount of data and the system specification - you must test the configuration before applying it to the system.

Comparison of Map-Reduce Performance between Hadoop and MongoDB

Data count File size Hadoop MongoDB
Shard 2
Standalone Distributed
10,000 2 MB 1 second 23 seconds 1 second
100,000 25 MB 3 seconds 22 seconds 12 seconds
1,000,000 248 MB 20 seconds 29 seconds 65 seconds
10,000,000 2,480 MB 183 seconds 53 seconds 805 seconds
System - 1 ea 4 ea 5 ea

Comparison of Map-Reduce Performance between Hadoop and MongoDB

Figure 4: Comparison of Map-Reduce Performance between Hadoop and MongoDB.

Note that you must compensate for the difference in the number of servers used for the standalone operation and the fully-distributed operation by adjusting the amount of data to be processed. Also note that I did not consider using 4 systems for the standalone operation, because doing so would require a separate data split and collection process, which would require additional development resources. Besides, it is simpler to scale in the fully-distributed operation than it is in the standalone operation, because you can just add a data node without modifying the data split and collection process.

I have tested MongoDB as well, in the hope that I could implement both the aggregation system and the index storage system with MongoDB if its performance was similar to that of Hadoop. To my dismay, the Map-Reduce of MongoDB is quite subpar compared to that of Hadoop, as shown in the table above. In short, MongoDB is roughly four times slower than Hadoop in fully-distributed mode. A newer version of MongoDB has become available since the test, but I doubt that it would show a noticeable difference in performance from earlier versions.

Productivity - What Makes MongoDB Stand Out

When processing small data, MongoDB is still useful. This is because its productivity outweighs its lack of performance when processing small data. The following is the Map-Reduce method using the Java API in Hadoop.

//Map
public static class DailyIdCountMapClass extends Map-ReduceBase implements
    Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        String[] tokens = line.split(UserIdConstants.COLUMN_SEPARATOR); 
        String userKey = tokens[0];
        output.collect(new Text(userKey), new IntWritable(1));
    }
}
//Reduce
public static class DailyIdCountReduceClass extends Map-ReduceBase implements
    Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
        Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

The following is a list of internal commands used in the Map-Reduce of MongoDB:

> m = function() { emit(this.user_id, 1); }//Map
> r = function(k,vals) { return 1; }//Reduce
> res = db.wklog_user.Map-Reduce(m, r);

Obviously, the MongoDB commands are far simpler than in Hadoop. This is because the commands of MongoDB take advantage of functional language, such as Scala and Ruby. Of course, Hadoop could be optimized with a script language such as Python.

Despite the outstanding performance of Map-Reduce in Hadoop, debugging it could be a challenge, because the data is displayed across multiple, distributed nodes. Maintenance could also be a hassle, because a jar archive must be created and executed when compiling code. But there is some good news - writing the Map-Reduce function is an easy task if you fully understand the underlying concepts. Although you could go the extra mile to fine-tune its parameters, I will not cover that in this article.

To recap, I tested the data aggregation performance for the following four items:

  1. Oracle DBMS;
  2. the standalone operation of Hadoop;
  3. the fully-distributed operation of Hadoop;
  4. and the Map-Reduce of MongoDB.

Out of the four contenders, the fully-distributed operation of Hadoop proved to be the winner for its speed and scalability.

Selecting Aggregation Information Storage: MongoDB vs. Cassandra

The test I have performed aimed to find the most efficient Map-Reduce alternative to replace the existing the Group by operation of RDBMS. Now it is time to select a storage to store, manage and provide the aggregation data. Oracle was eliminated from consideration from the start, because its scalability was limited.

I have chosen two test subjects, based on popularity. The first subject is Cassandra from Facebook, which is known for its column family form. According to an article, Facebook has developed an HBase-based real-time analysis system that can handle some 20 billion events a day. That means Facebook uses HBase as their Map-Reduce and storage. I wish I could benchmark HBase in my article.

The other test subject is MongoDB, which I paired with Hadoop for comparing the Map-Reduce performance. MongoDB is an intuitive document-oriented data model that offers flexibility in field management, one of the advantages of schema-free databases. In addition, MongoDB offers a full-text indexing feature, enabling a quick search against all fields in all documents or an indexing of a specific field. MongoDB has a reputation for good performance and reliability.

It is obvious that the read performance of Cassandra is inferior to that of MongoDB, as shown in the table below. Actually, there is a whole article written by my colleague from Storage System Development Team at NHN called NoSQL Benchmarking. Refer to it for deeper results on performance comparison between Cassandra and MongoDB for both READ and WRITE.

After reviewing the code and related documents, I have concluded that the philosophical difference between the two solutions led to the difference in performance. Since MongoDB is a document-oriented model, it processes one row of the database with a single insert operation. On the other hand, Cassandra, a column-based model, processes one column of the database with a single insert operation. For this reason, Cassandra is slower than MongoDB. The data used by the test consists of 30 columns. I assumed that the performance would be different by about 30-fold. But the actual performance showed a difference of 300 times.

The following code shows that the insert operation must be repeated as many times as the number of columns.

ColumnPath columnPath1 = new ColumnPath(columnFamily);
columnPath1.setColumn(columnName1.getBytes());
client.insert("Keyspace1", key, columnPath1, columnValue1.getBytes(),timestamp, ConsistencyLevel.ONE);
<omitted>
ColumnPath columnPathN = new ColumnPath(columnFamily);
columnPathN.setColumn(columnNameN.getBytes());
client.insert("Keyspace1", key, columnPathN, columnValueN.getBytes(), timestamp, ConsistencyLevel.ONE);

To confirm that the column-based insert operation caused the bad write performance, we have reduced the number of columns in the test. By combining the other columns except for key, we reduced the number of columns to two. As shown in Table 3, the 100,000 write operation performance of Cassandra was significantly faster when the number of columns was two than when it was 30. However, it could not still surpass the write performance of MongoDB.

Table 3 Comparison of Write/Read Performance of MongoDB vs. Cassandra

Test Classification Criteria MongoDB Cassandra (30 columns) Cassandra (2 columns)
Write 100,000 3.551 1057.437 seconds 32.980 seconds
Read All 100,000 54.362 Not measurable (no response after waiting for 1 hour)
Random Read Searched 1.162
CPU 3% 20% 15%

For reference, the schema structure used for Cassandra performance test is as shown in the following figure. We have freely configured the columns in order to include all aggregation information related to one user (in the unit of user_id). Designing it as a hierarchical structure made it easy to add data items to each layer. It allowed a variable schema design, which we thought would be a benefit. However, the test result showed poor performance, so we had to discard it.

Cassandra Schema

Figure 5: Cassandra Schema.

When the fully-distributed operation was provided by Cassandra, the read performance of the same key will be better than it is now. However, the test considering N nodes was excluded from the scope of this test due to our internal situation. Therefore, for consistency in comparison, we have configured MongoDB as a standalone operation.

For reference, MongoDB supports write-ahead journaling from version 1.7.5 for recovery from errors and durability of the storage engine. If this journaling option is used, the stability will improve, but the performance measurement result of MongoDB will get worse. Therefore, this option has been excluded from the test scope, as the aggregate data allows easy recovery.

Separately from the above test result, there was another important reason why we have selected MongoDB; it provides the composite index function. To utilize various inquiry conditions for aggregate data, we needed to set multiple indexes. MongoDB can create indexes for data of one million cases in just one second. In addition, users familiar with the relational database can more easily access the solution, since it is conceptually similar to SQL. On the other hand, to use Cassandra, users must understand the key-value based tree structure.
Based on the performance test of MongoDB and Cassandra, we selected MongoDB by considering the batch write performance and performance of reading various columns. The write/read performance test was carried out based on standalone operation; however, the final configuration of MongoDB was determined as five servers as shown in the following figure.

MongoDB Cluster Configuration

Figure 6: MongoDB Cluster Configuration.

We underwent two trials configuring as shown above. First, we configured the cluster with four mongod config servers, excluding one arbiter server, but the cluster did not operate. The configuration document specified "one or three servers must be used to configure a cluster," but we missed that point. In addition, when the Shard configuration across several servers is incorrect and there is a network failure, data integrity may not be kept. To solve the problem, an additional arbiter server (cbat005 in the figure) must be configured.

Final Hadoop & MongoDB Based Aggregation System

The following figure shows the final configuration of the system. The system may be configured with the Hadoop server and MongoDB in the standalone operation. However, we configured it to enable easy expansion, as the data volume is continuously increasing. Data flows in the order of Oracle, ETL, Hadoop, and MongoDB.

System Configuration

Figure 7: System Configuration.

The operation scenario of the above configuration is illustrated as shown below.

  1. Select the target data to aggregate from the database.
  2. Extract data by using ETL tool.
  3. Balance and load the data to the HDFS DataNode of the Hadoop Cluster.
  4. Execute Map-Reduce in the NameNode of the Hadoop Cluster.
  5. The aggregation result is transferred and loaded to the MongoDB by using the transfer application. At this time, L4 is used for load balancing.
  6. Data loaded onto the MongoDB is provided to users via the external modules.

Among the above processes, a natural data flow is shown from the first process to the third process, but in the fourth process, aggregation is executed by using Map-Reduce. In the fifth, the aggregation result is saved through the API provided by the MongoDB. When MongoDB is accessed, it is recommended to distribute load through L4. In addition, the server list (mongos server list) may be described while writing clients. Or, it may be possible to use RequestBroker. When the first to the fifth processes are complete, the user can receive data. As a supplementary explanation of the process to store data in the MongoDB, the client server determines in which server Shard data will be stored through the config server. Data are stored in the master server of Shard and copied to the replication server. MongoDB specifies the arbiter in order to select the master server from among the remaining servers when there is any failed distributed server.

Conclusion

We have accomplished the first goal by configuring the system as described. We recommend you consider the following if you are planning to introduce a Map-Reduce or NoSQL solution.

If the data volume is not particularly high and Map-Reduce must be applied, configure the environment to Hadoop standalone operation. If the data volume is large, configure the fully-distributed operation consisting of at least three DataNodes. However, if you are using MongoDB and the data volume is not particularly high, MongoDB Map-Reduce will be useful.

For NoSQL, if multi-index is required, choose MongoDB. If there are many changes in data items and many unique accesses, Cassandra will be better. Since we have not yet tested Hbase, an additional review of that is also required.

After accomplishing the first goal, we are considering the second goal. We are trying to establish long-term measures that can effectively process the continuously increasing Oracle data. To solve the problem, we think that the log data for which coupling to the master data is low and inquiry frequency is also low must be moved to the NoSQL storage. If this problem can be solved using the NoSQL solution, we expect that we can reduce the costs of using relatively-expensive Oracle storage.

The NoSQL solution is being advanced according to various needs. The biggest advantage of the NoSQL solution that I have experienced is Scale-out. And the relatively-fast write/read performance is a bonus. However, there is no such thing as a free lunch; we can lose our precious time in development.

I wanted to share our NoSQL experience in the hope that it can be helpful for you. This is the first article I have posted on this site, so I'm concerned that my poor writing skills may not be sufficiently effective to convey our experience. I hope that more information on the NoSQL solution can be shared in the future.

By Jeong Hyun Lee, Web Service Development Team, NHN Corporation.



comments powered by Disqus