Friday, December 7, 2012

Java Serialization - Good, Fast, and Faster

Probably anyone who has ever worked with serialization of objects, be that in Java or any other language, knows that it should be avoided whenever possible. Just like the first rule of distribution is "Do not distribute!", the first rule of serialization should be "Do not serialize!". However, in many cases, especially in distributed environments, serialization cannot be avoided and therefore must be significantly optimized to achieve any kind of reasonable throughput.

At GridGain, given the distributed nature of our product, we have always been working on optimizing of our serialization routines, but starting with version 4.3.0 we have achieved the fastest results so far. Our GridOptimizedMarshaller in our tests achieved up to 20x performance optimization on standard Java serialization with java.io.Serializable. If you switch to java.io.Externalizable, then GridGain marshaller is up to 10x faster. We have even compared our marshaller to Kryo serialization, and turns out that our marshaller is up to 5x faster than Kryo. On top of that, the footprint of GridGain serialized objects is significantly smaller than Java.

The coolest thing here is that we do not require any custom interfaces or API s - GridGain optimized serialization works directly with standard Java POJOs, regardless if they implement java.io.Serializable interface or not. If your POJOs implement java.io.Externalizable, then our marshaling works even faster.

How do we do it? The main culprit of Java serialization is java.io.ObjectOutputStream which is extremely expensive to initialize and performs poorly. The first thing we did is replaced it with our own implementation, based on direct memory copying by invoking native C and Java so-called "unsafe" routines.  We also serialize fields in predefined order by doing lots of object introspection which allows us to pass only values and not their type names or other metadata.

Here are the results from the test on my MacBookPro 2.7 GHz Intel i7:
>>> Java serialization via Externalizable (average): 22,551 ms
>>> Kryo serialization (average): 17,300 ms
>>> GridGain serialization (average): 2,937 ms
Here is the test itself. This test is included with our product, so feel free to download it and try for yourself. 
public class SerializationBenchmark {
    /** Number of runs. */
    private static final int RUN_CNT = 3;

    /** Number of iterations. */
    private static final int ITER_CNT = 200000;

    public static void main(String[] args) throws Exception {
        // Create sample object.
        SampleObject obj = createObject();

        // Run Java serialization test.
        javaSerialization(obj);

        // Run Kryo serialization test.
        kryoSerialization(obj);

        // Run GridGain serialization test.
        gridGainSerialization(obj);
    }

    private static long javaSerialization(SampleObject obj) throws Exception {
        long avgDur = 0;

        for (int i = 0; i < RUN_CNT; i++) {
            SampleObject newObj = null;

            long start = System.currentTimeMillis();

            for (int j = 0; j < ITER_CNT; j++) {
                ByteArrayOutputStream out = new ByteArrayOutputStream();

                ObjectOutputStream objOut = null;

                try {
                    objOut = new ObjectOutputStream(out);

                    objOut.writeObject(obj);
                }
                finally {
                    U.close(objOut, null);
                }

                ObjectInputStream objIn = null;

                try {
                    objIn = new ObjectInputStream(
                        new ByteArrayInputStream(out.toByteArray()));

                    newObj = (SampleObject)objIn.readObject();
                }
                finally {
                    U.close(objIn, null);
                }
            }

            long dur = System.currentTimeMillis() - start;

            avgDur += dur;
        }

        avgDur /= RUN_CNT;

        System.out.format("\n>>> Java serialization via Externalizable (average): %,d ms\n\n", avgDur);

        return avgDur;
    }

    private static long kryoSerialization(SampleObject obj) throws Exception {
        Kryo marsh = new Kryo();

        long avgDur = 0;

        for (int i = 0; i < RUN_CNT; i++) {
            SampleObject newObj = null;

            long start = System.currentTimeMillis();

            for (int j = 0; j < ITER_CNT; j++) {
                ByteArrayOutputStream out = new ByteArrayOutputStream();

                Output kryoOut = null;

                try {
                    kryoOut = new Output(out);

                    marsh.writeObject(kryoOut, obj);
                }
                finally {
                    U.close(kryoOut, null);
                }

                Input kryoIn = null;

                try {
                    kryoIn = new Input(new ByteArrayInputStream(out.toByteArray()));

                    newObj = marsh.readObject(kryoIn, SampleObject.class);
                }
                finally {
                    U.close(kryoIn, null);
                }
            }

            long dur = System.currentTimeMillis() - start;

            avgDur += dur;
        }

        avgDur /= RUN_CNT;

        System.out.format("\n>>> Kryo serialization (average): %,d ms\n\n", avgDur);

        return avgDur;
    }

    private static long gridGainSerialization(SampleObject obj) throws Exception {
        GridMarshaller marsh = new GridOptimizedMarshaller(false, 
            Arrays.asList(SampleObject.class.getName()), null);

        long avgDur = 0;

        for (int i = 0; i < RUN_CNT; i++) {
            SampleObject newObj = null;

            long start = System.currentTimeMillis();

            for (int j = 0; j < ITER_CNT; j++)
                newObj = marsh.unmarshal(marsh.marshal(obj), null);

            long dur = System.currentTimeMillis() - start;

            avgDur += dur;
        }

        avgDur /= RUN_CNT;

        System.out.format("\n>>> GridGain serialization (average): %,d ms\n\n", avgDur);

        return avgDur;
    }

    private static SampleObject createObject() {
        long[] longArr = new long[3000];

        for (int i = 0; i < longArr.length; i++)
            longArr[i] = i;

        double[] dblArr = new double[3000];

        for (int i = 0; i < dblArr.length; i++)
            dblArr[i] = 0.1 * i;

        return new SampleObject(123, 123.456f, (short)321, longArr, dblArr);
    }

    private static class SampleObject 
        implements Externalizable, KryoSerializable {
        private int intVal;
        private float floatVal;
        private Short shortVal;
        private long[] longArr;
        private double[] dblArr;
        private SampleObject selfRef;

        public SampleObject() {}

        SampleObject(int intVal, float floatVal, Short shortVal, 
            long[] longArr, double[] dblArr) {
            this.intVal = intVal;
            this.floatVal = floatVal;
            this.shortVal = shortVal;
            this.longArr = longArr;
            this.dblArr = dblArr;

            selfRef = this;
        }

        // Required by Java Externalizable.
        @Override public void writeExternal(ObjectOutput out) 
            throws IOException {
            out.writeInt(intVal);
            out.writeFloat(floatVal);
            out.writeShort(shortVal);
            out.writeObject(longArr);
            out.writeObject(dblArr);
            out.writeObject(selfRef);
        }

        // Required by Java Externalizable.
        @Override public void readExternal(ObjectInput in) 
         throws IOException, ClassNotFoundException {
            intVal = in.readInt();
            floatVal = in.readFloat();
            shortVal = in.readShort();
            longArr = (long[])in.readObject();
            dblArr = (double[])in.readObject();
            selfRef = (SampleObject)in.readObject();
        }

        // Required by Kryo serialization.
        @Override public void write(Kryo kryo, Output out) {
            kryo.writeObject(out, intVal);
            kryo.writeObject(out, floatVal);
            kryo.writeObject(out, shortVal);
            kryo.writeObject(out, longArr);
            kryo.writeObject(out, dblArr);
            kryo.writeObject(out, selfRef);
        }

        // Required by Kryo serialization.
        @Override public void read(Kryo kryo, Input in) {
            intVal = kryo.readObject(in, Integer.class);
            floatVal = kryo.readObject(in, Float.class);
            shortVal = kryo.readObject(in, Short.class);
            longArr = kryo.readObject(in, long[].class);
            dblArr = kryo.readObject(in, double[].class);
            selfRef = kryo.readObject(in, SampleObject.class);
        }
    }
}

Tuesday, November 20, 2012

GridGain and Hadoop: Differences and Synergies

Overview

GridGain is Java-based middleware for in-memory processing of big data in a distributed environment. It is based on high performance in-memory data platform that integrates fast In-Memory MapReduce implementation with In-Memory Data Grid technology delivering easy to use and easy to scale software. Using GridGain you can process terabytes of data, on 1000s of nodes in under a second. 

GridGain typically resides between business, analytics, transactional or BI applications and long term data storage such as RDBMS, ERP or Hadoop HDFS, and provides in-memory data platform for high performance, low latency data storage and processing.

Both, GridGain and Hadoop, are designed for parallel processing of distributed data. However, both products serve very different goals and in most cases are very complementary to each other. Hadoop is mostly geared towards batch-oriented offline processing of historical and analytics payloads where latencies and transactions don't really matter, while GridGain is meant for real-time in-memory processing of both transactional and non-transactional live data with very low latencies. To better understand where each product really fits, let us compare some main concepts of each product.

GridGain In-Memory Compute Grid vs Hadoop MapReduce

MapReduce is a programming model developed by Google for processing large data sets of data stored on disks. Hadoop MapReduce is an implementation of such model. The model is based on the fact that data in a single file can be distributed across multiple nodes and hence the processing of those files has to be co-located on the same nodes to avoid moving data around. The processing is based on scanning files record by record in parallel on multiple nodes and then reducing the results in parallel on multiple nodes as well. Because of that, standard disk-based MapReduce is good for problem sets which require analyzing every single record in a file and does not fit for cases when direct access to a certain data record is required. Furthermore, due to offline batch orientation of Hadoop it is not suited for low-latency applications. 

GridGain In-Memory Compute Grid (IMCG) on the other hand is geared towards in-memory computations and very low latencies. GridGain IMCG has its own implementation of MapReduce which is designed specifically for real-time in-memory processing use cases and is very different from Hadoop one. Its main goal is to split a task into multiple sub-tasks, load balance those sub-tasks among available cluster nodes, execute them in parallel, then aggregate the results from those sub-tasks and return them to user.
Splitting tasks into multiple sub-tasks and assigning them to nodes is the mapping step and aggregating of results is reducing step. However, there is no concept of mandatory data built in into this design and it can work in the absence of any data at all which makes it a good fit for both, stateless and state-full computations, like traditional HPC. In cases when data is present, GridGain IMCG will also automatically colocate computations with the nodes where the data is to avoid redundant data movement.

It is also worth mentioning, that unlike Hadoop, GridGain IMCG is very well suited for processing of computations which are very short-lived in nature, e.g. below 100 milliseconds and may not require any mapping or reducing.

Here is a simple Java coding example of GridGain IMCG which counts number of letters in a phrase by splitting it into multiple words, assigning each word to a sub-task for parallel remote execution in the map step, and then adding all lengths receives from remote jobs in reduce step.

    int letterCount = g.reduce(
        BALANCE,
        // Mapper
        new GridClosure<String, Integer>() {
            @Override public Integer apply(String s) {
                return s.length();
            }
        },
        Arrays.asList("GridGain Letter Count".split(" ")),
        // Reducer
        F.sumIntReducer()
    ));

GridGain In-Memory Data Grid vs Hadoop Distributed File System

Hadoop Distributed File System (HDFS) is designed for storing large amounts of data in files on disk. Just like any file system, the data is mostly stored in textual or binary formats. To find a single record inside an HDFS file requires a file scan. Also, being distributed in nature, to update a single record within a file in HDFS requires copying of a whole file (file in HDFS can only be appended). This makes HDFS well-suited for cases when data is appended at the end of a file, but not well suited for cases when data needs to be located and/or updated in the middle of a file. With indexing technologies, like HBase or Impala, data access becomes somewhat easier because keys can be indexed, but not being able to index into values (*secondary indexes*) only allow for primitive query execution.

GridGain In-Memory Data Grid (IMDG) on the other hand is an in-memory key-value data store. The roots of IMDGs came from distributed caching, however GridGain IMDG also adds transactions, data partitioning, and SQL querying to cached data. The main difference with HDFS (or Hadoop ecosystem overall) is the ability to transact and update any data directly in real time. This makes GridGain IMDG well suited for working on operational data sets, the data sets that are currently being updated and queried, while HDFS is suited for working on historical data which is constant and will never change. 

Unlike a file system, GridGain IMDG works with user domain model by directly caching user application objects. Objects are accessed and updated by key which allows IMDG to work with volatile data which requires direct key-based access.

GridGain IMDG allows for indexing into keys and values (i.e. primary and secondary indices) and supports native SQL for data querying & processing. One of unique features of GridGain IMDG is support for distributed joins which allow to execute complex SQL queries on the data in-memory without limitations.

GridGain and Hadoop Working Together

To summarize:
Hadoop essentially is a Big Data warehouse which is good for batch processing of historic data that never changes, while GridGain, on the other hand, is an In-Memory Data Platform which works with your current operational data set in transactional fashion with very low latencies. Focusing on very different use cases make GridGain and Hadoop very complementary with each other.

Up-Stream Integration

The diagram above shows integration between GridGain and Hadoop. Here we have GridGain In-Memory Compute Grid and Data Grid working directly in real-time with user application by partitioning and caching data within data grid, and executing in-memory computations and SQL queries on it. Every so often, when data becomes historic, it is snapshotted into HDFS where it can be analyzed using Hadoop MapReduce and analytical tools from Hadoop eco-system. 

Down-Stream Integration

Another possible way to integrate would be for cases when data is already stored in HDFS but needs to be loaded into IMDG for faster in-memory processing. For cases like that GridGain provides fast loading mechanisms from HDFS into GridGain IMDG where it can be further analyzed using GridGain in-memory Map Reduce and indexed SQL queries.

Conclusion

Integration between an in-memory data platform like GridGain and disk based data platform like Hadoop allows businesses to get valuable insights into the whole data set at once, including volatile operational data set cached in memory, as well as historic data set stored in Hadoop. This essentially eliminates any gaps in processing time caused by Extract-Transfer-Load (ETL) process of copying data from operational system of records, like standard databases, into historic data warehouses like Hadoop. Now data can be analyzed and processed at any point of its lifecycle, from the moment when it gets into the system up until it gets put away into a warehouse.


Monday, November 12, 2012

GridGain 4.3.1 Released


GridGain 4.3.1 service release includes several important bug fixes and host of new optimizations. It is 100% backward compatible and it is highly recommended update for anyone running production systems on 4.x code line.

Details

DateNovember 10th, 2012
Version4.3.1e
Build10112012

New Features and Enhancements

    • Added remove operation to data loader
    • Significantly improved performance of partition to node mapping
    • Added GridSerializationBenchmark for comparing performance of Java, Kryo, and GridGain serialization
    • Added property-based configuration to remote clients
    • Optimized concurrency for asynchronous methods in C++ client
    • Removed support for Groovy++ DSL Grover

Core Bug Fixes

    • Unmarshalling of SimpleDateFormat fails with NPE
    • Possible NPE in Indexing Manager when using distributed data structures
    • Swap partition iterator skips entries if off-heap iterator is empty
    • `GridDataLoader` does not allow to cache primitive arrays
    • Excessive memory consumption in indexing SPI
    • Add check on startup that GridOptimizedMarshaller is supported by running JDK version
    • If ordered message is timed out, other messages for the same topic may not be processed
    • ScalarPiCalculationExample does not provide correct estimate for PI

Client Connectivity Bug Fixes

    • Client router with explicit default configuration leads to NPE.
    • Repair REST client support to make session token and client ID optional
    • Ping does not work properly in C++ client

Visor Management Bug Fixes

    • Clear and Compact operations in Visor do not account for node selection
    • Move Visor management tasks into a separate thread pool
    • Preload dialog in Visor does not show correct number of keys
    • GC dialog in Visor waits indefinitely for dead nodes
    • Increase tooltip dismiss time in Visor
    • Visor log search does not show nodes table correctly on Windows

Thursday, November 1, 2012

In-Memory Data Grids... Explained


In-memory processing has been a pretty hot topic lately. Many companies that historically would not have considered using in-memory technology because it was cost prohibitive are now changing their core systems’ architectures to take advantage of the low-latency transaction processing that in-memory technology offers.  This is a consequence of the fact that the price of RAM is dropping significantly and rapidly and as a result, it has become economical to load the entire operational dataset into memory with performance improvements of over 1000x faster. In-Memory Compute and Data Grids provide the core capabilities of an in-memory architecture.

The goal of In-Memory Data Grids (IMDG) is to provide extremely high availability of data by keeping it in memory and in highly distributed (i.e. parallelized) fashion. By loading Terabytes of data into memory IMDGs are able to support most of the Big Data processing requirements today.

At a very high level, an IMDG is a distributed object store similar in interface to a typical concurrent hash map. You store objects with keys. Unlike traditional systems where keys and values are often limited to byte arrays or strings, with IMDGs you can use any domain object as either value or key. This gives tremendous flexibility by allowing you to keep exactly the same object your business logic is dealing with in the Data Grid without the extra step of marshaling and de-marshaling that alternative technologies would require. It also simplifies the usage of your data grid as you can, in most cases, interface with the distributed data store as you do with a simple hash map. Being able to work with domain objects directly is one of the main differences between IMDGs and In-Memory Databases (IMDB). In the case of the latter, users still need to perform Object-To-Relational Mapping which typically adds significant performance overhead.

There are also some other features in IMDGs that distinguish them from other products, such as NoSql databases, IMDBs,  or NewSql databases. One of the main differences is truly scalable Data Partitioning across the cluster. Essentially IMDGs in their purest form can be viewed as distributed hash maps with every key cached on a particular cluster node - the bigger the cluster, the more data you can cache. The trick to this architecture is to make sure that you collocate your processing with the cluster nodes where data is cached to make sure that all cache operations become local and that there is no (or minimal) data movement within the cluster. In fact, when using well designed IMDGs, there should be absolutely no data movement on stable topologies. The only time when some of the data is moved is when new nodes join in or some existing nodes leave the cluster, thus causing some data repartitioning within the cluster.

The picture below shows a classic IMDG with a key set of {k1, k2, k3} where each key belongs to a different node. The external database component is optional. If present, then IMDGs will usually automatically read data from the database or write data to it.

Another distinguishing characteristic of IMDGs is Transactional ACID support. Generally a 2-phase-commit (2PC) protocol is used to ensure data consistency within cluster. Different IMDGs will have different underlying locking mechanisms, but usually more advanced implementations will provide concurrent locking mechanisms (GridGain, for instance, uses MVCC - multi-version concurrency control) and reduce network chattiness to a minimum, hence guaranteeing transactional ACID consistency with very high performance.

Data consistency is one of the main differences between IMDGs and NoSQL databases. NoSQL databases are usually designed using an Eventual Consistency (EC) approach where data is allowed to be inconsistent for a period of time as long as it will become consistent *eventually*. Generally, the writes on EC-based systems are somewhat fast, but reads are slow (or to be more precise, as fast as writes are). Latest IMDGs with an *optimized* 2PC should at least match if not outperform EC-based systems on writes, and be significantly faster on reads. It is interesting to note that the industry has made a full circle moving from  a then-slow 2PC approach to the EC approach, and now from EC to a much faster *optimized* 2PC.

Different products provide different 2PC optimizations, but generally the purpose of all optimizations is to increase concurrency, minimize network overhead, and reduce the number of locks a transaction requires to complete. As an example, Google's distributed global database, Spanner, is based on a transactional 2PC approach simply because 2PC provided a faster and more straightforward way to guarantee data consistency and high throughput compared to MapReduce or EC.

Even though IMDGs usually share some common basic functionality, there are many features and implementation details that are different between vendors. When evaluating an IMDG product pay attention to eviction policies, (pre)loading techniques, concurrent repartitioning, memory overhead, etc... Also pay attention to the ability to query data at runtime. Some IMDGs, such as GridGain for example, allow users to query in-memory data using standard SQL, including support for distributed joins, which is pretty rare.

Storing data in an IMDG is only half of the functionality required for an in-memory architecture. This data must also be processed in a high-performance and parallelized manner. The typical in-memory architecture partitions data across the cluster using an IMDG, and then computations are sent to the nodes where the data is for collocated (local) execution. Since computations are usually part of Compute Grids and have to be properly deployed, load-balanced, failed-over, or scheduled, the integration between Compute Grids and IMDGs is very important. It is especially beneficial if both In-Memory Compute and Data Grids are part of the same product and utilize the same APIs which removes the burden of integration from the developer and usually renders the highest performance and reliable in-memory systems.



IMDGs (together with Compute Grids) are used throughout a wide spectrum of industries in applications as diverse as Risk Analytics, Trading Systems, real time Fraud Detection, Biometrics, eCommerce, or Online Gaming. Essentially every project that struggles with scalability and performance can benefit from In-Memory Processing and IMDG architecture.

Friday, September 21, 2012

MergeSort on GridGain

Here is an example of how you can perform MergeSort on a distributed grid product like GridGain. This example is somewhat artificial, as you probably would never do the same thing in real life (executing the same code locally is most likely faster), but it does demonstrate some pretty cool features of GridGain, like recursive task execution and continuations.

This is the task class which splits array in two and sends remote jobs to sort the new arrays. Remote jobs in their turn execute the same task over and over again until we get to array size of 1, after which we begin merge process.
class GridMergeSortTask extends GridTaskSplitAdapter<int[], int[]> {
    // Injected Grid instance.
    @GridInstanceResource private Grid grid;

    @Override 
    protected Collection<GridJob> split(int gridSize, int[] initArr) {
        Collection<GridJob> jobs = new LinkedList<GridJob>();

        for (final int[] arr : splitArray(initArr)) {
            jobs.add(new GridJobAdapterEx() {
                // Auto-inject job context.
                @GridJobContextResource
                private GridJobContext jobCtx;

                // Task execution result future.
                private GridTaskFuture<int[]> fut;

                @Override public Object execute() throws GridException {
                    if (arr.length == 1)
                        return arr;

                    // Future is null before holdcc() is called and
                    // not null after callcc() is called.
                    if (fut == null) {
                        // Launch the recursive child task asynchronously.
                        fut = grid.execute(new GridMergeSortTask(), arr);

                        // Add a listener to the future, that will resume the
                        // parent task once the child one is completed.
                        fut.listenAsync(new GridInClosure<GridFuture<int[]>>() {
                            @Override public void apply(GridFuture<int[]> fut) {
                                // CONTINUATION:
                                // =============
                                // Resume suspended job execution.
                                jobCtx.callcc();
                            }
                        });

                        // CONTINUATION:
                        // =============
                        // Suspend job execution to be continued later and
                        // release the executing thread.
                        return jobCtx.holdcc();
                    }
                    else {
                        assert fut.isDone();

                        // Return the result of a completed child task.
                        return fut.get();
                    }
                }
            });
        }

        return jobs;
    }

    /**
     * GridTask reduce logic. This method is called when both child jobs
     * are completed, and is a Reduce step of Merge Sort algorithm.
     */
    @Override public int[] reduce(List<GridJobResult> results) {
        // This is in case we have a single-element array.
        if (results.size() == 1)
            return results.get(0).getData();

        assert results.size() == 2;

        int[] arr1 = results.get(0).getData();
        int[] arr2 = results.get(1).getData();

        return mergeArrays(arr1, arr2);
    }

    private static Iterable<int[]> splitArray(int[] arr) {
        int len1 = arr.length / 2;
        int len2 = len1 + arr.length % 2;

        int[] a1 = new int[len1];
        int[] a2 = new int[len2];

        System.arraycopy(arr, 0, a1, 0, len1);
        System.arraycopy(arr, len1, a2, 0, len2);

        System.out.println("Split array [arr1Len=" + a1.length + 
            ", arr2Len=" + a2.length + ']');

        return Arrays.asList(a1, a2);
    }

    private static int[] mergeArrays(int[] arr1, int[] arr2) {
        int[] ret = new int[arr1.length + arr2.length];

        int i1 = 0;
        int i2 = 0;

        // Merge 2 arrays into a resulting array
        for (int i = 0; i < ret.length; i++) {
            if (i1 >= arr1.length) {
                System.arraycopy(arr2, i2, ret, i, arr2.length - i2);

                break;
            }
            else if (i2 >= arr2.length) {
                System.arraycopy(arr1, i1, ret, i, arr1.length - i1); 

                break;
            }
            else
                ret[i] = arr1[i1] <= arr2[i2] ? arr1[i1++] : arr2[i2++];
        }

        System.out.println("Merged arrays [resLen=" + ret.length + 
            ", arr1Len=" + arr1.length + ", arr2Len=" + arr2.length + ']');

        return ret;
    }
}
And here is how you would call this task:
    public static void main(String[] args) throws GridException {
        Grid grid = G.start();

        try {
            int[] inArr = generateRandomArray(30);

            System.out.println("Unsorted array: " + Arrays.toString(inArr));

            int[] outArr = grid.execute(new GridMergeSortTask(), inArr).get();

            System.out.println("Sorted array: " + Arrays.toString(outArr));
        }
        finally {
            G.stop(true);
        }
    }

    private static int[] generateRandomArray(int size) {
        int[] ret = new int[size];

        Random rnd = new Random();

        for (int i = 0; i < ret.length; i++)
            ret[i] = rnd.nextInt(100);

        return ret;
    }
Enjoy!

Wednesday, September 5, 2012

10 Useful Performance Tuning Tips For GridGain

I have been getting many questions of how to tune GridGain, so I decided to create a brief manual which covers most important tuning properties.

1. GridGain is multi-threaded - Use It

If you are experiencing somewhat slow performance for cache updates, you should ask yourselves whether you are utilizing full computing power (all the cores) on your machine. GridGain is multi-threaded internally, but if you are doing sequential operations one after another from a single thread, then you are not using multithreading. Generally it makes sense to use the amount of threads of about 2 or 3 times the number of cores for populating grid. All GridGain APIs are thread-safe, so you don't have to worry about any concurrency issue when populating data.

2. Use Collocated Computations

GridGain enables you to execute MapReduce computations in memory. However, most computations usually work on some data which is cached on remote grid nodes. Loading that data from remote nodes is usually expensive and it is a lot more cheaper to send the computation to the node where the data is. The easiest way to do it is to use GridProjection.affinityRun(...) method; however GridGain has plenty of "mapKeysToNodes(...)" methods to help users figure out data ownership within Grid.

3. Use Data Loader

If you need to upload lots of data into cache, use org.gridgain.grid.GridDataLoader to do it. Data loader will properly batch the updates prior to sending them to remote nodes and will properly control number of parallel operations taking place on each node to avoid thrashing. Generally it  provides performance of 10x than doing a bunch of single-threaded updates.

4. Tune Initial Cache Size

To avoid internal resizing of cache maps you should always provide proper cache start size - not doing so can significantly hurt performance as some CPU cycles will be spent on GridGain resizing internal cache maps instead of application logic. You can configure cache start size via GridCacheConfiguration.getStartSize() configuration property.

5. Tune Near Cache

When using Partitioned cache, GridGain will front this cache with local Near cache to make sure that if entry does not belong to local partitions, it will still be cached in a smaller local cache for better performance on next access. 

However, most usages of GridGain happen from collocated computations, i.e. computations submitted to the grid are usually routed to the nodes where the data resides automatically. In cases like this, using Near cache is redundant, as all data access happens from memory anyway. To save on performance, you can disable Near cache by setting GridCacheConfiguration.isNearEnabled() configuration property.

6. Tune Off-Heap Memory

If you plan to allocate large amounts of memory to your JVM for data caching (usually more than 10GB of memory), then your application will most likely suffer from prolonged lock-the-world GC pauses which can significantly hurt latencies. To avoid GC pauses use off-heap memory to cache data - essentially your data is still cached in memory, but JVM does not know about it and GC is not affected.

The only configuration property to set to enable off-heap memory is GridCacheConfiguration.getMaxOffHeapMemory() which will tell GridGain how much off-heap memory to make available for your application. By default off-heap memory is disabled.

7. Tune Swap Storage

First of all, if you don't plan to use swap storage (i.e. disk overflow storage), you should not change any default swap settings (swap storage is disabled by default). If you do need to use swap storage, then you should enable it via GridCacheConfiguration.isSwapEnabled() configuration property.

8. Tune Query Indexing

There are several configuration properties that you should watch out for here. First of all and most importantly, if you don't plan to use cache queries at all, you should disable indexing altogether via GridCacheConfiguration.isQueryIndexEnabled() configuration property.

If you do plan to use cache queries, you should properly enable/disable indexing of primitive keys and values on GridH2IndexingSpi. You should enable indexing for primitive keys by setting setDefaultIndexPrimitiveKey() to true on the SPI only if you plan to use primitive cache keys in your cache queries. The same goes for indexing primitive values controlled by setDefaultIndexPrimitiveValue(...) property. 

Also, if for every value class you don't plan to have different key classes (essentially every value class has one key class), set setDefaultIndexFixedTyping(...) on the SPI to true. This way GridGain will store key types as corresponding SQL types instead of binary form which provides faster performance for key lookups.

9. Tune Eviction Policy

Again, if you don't plan to over-populate your cache, i.e. if you don't need any eviction policy at all, then you should disable eviction policy altogether via GridCacheConfiguration.isEvictionEnabled() configuration property. 

If you do need GridGain to make sure that data in cache does not overgrow beyond allowed memory limits, you should carefully choose the eviction policy you need. Most likely you will need either FIFO or LRU eviction policies shipped with GridGain, however depending on your application, you may need to configure LIRs or plugin your own custom eviction policy. Regardless of which eviction policy you use, you should carefully chose the maximum amount of entries in cache allowed by eviction policy - if cache size overgrows this limit, then evictions will start occurring. Usually max size is controlled by setMaxSize(...) configuration property on the instance of eviction policy.

You should also almost always configure "setAllowEmptyEntries(...)" configuration property to false. By default GridGain will keep entries with null values in cache to preserve some other properties of the entry, like time-to-live for example. However, if you don't use time-to-live then most likely you should discard the entry once it gets expired or invalidated.

10. Use Write-Behind Caching

If you can afford for your persistent store to be behind your in-memory cache, then use write-behind caching. When write-behind is enabled, GridGain will batch up cache updates and flush them to database in batches in the background which can often provide significant performance benefits. You can enable write-behind caching via GridCacheConfiguration.isWriteBehindEnabled() configuration property.

Wednesday, July 18, 2012

Can Delaying Events Boost Performance Of Your Cluster?

Should you ever delay events in your cluster? Kind of interesting question. As engineers we are used to think that performance matters and delaying things goes directly against performance, right? Well, not always - there are quite a few cases in distributed environment when delaying things actually helps performance. For example, the most usual case is when you have a continuous stream of asynchronous computations on the grid. Since sending a computation is much faster than processing it, you can essentially overload grid and some sort of back pressure mechanism will be needed (usually you would just limit amount of active computations your grid can handle concurrently). However, in this blog I want to focus on delaying data operations rather than computations.

If you have heard about partitioned caching, then you perhaps know that whenever a new node joins the grid or an existing node leaves the grid, cluster repartitioning happens. This basically means that, in case of new node, it has to take responsibility for some of the data cached on other nodes, and in case of node leaving the grid, other nodes have to take responsibility for the data cached on that node. Essentially this results in data movement between data grid nodes. Picture below illustrates how keys get partitioned among caching data nodes (share-nothing-architecture):


Now imagine that you need to bring multiple nodes up concurrently. The 1st node that comes up will take responsibility for some portion of the data cached on other nodes and will start loading that portion of the data from other nodes. When a 2nd node comes up, it will also take responsibility for some portion of the data, including some data from the 1st node that was just started, and now portion of the data that was moved to 1st node will have to be moved to the 2nd node. Ouch - wouldn't it be more efficient to wait till 2nd node comes up to start data preloading? The same happens when nodes  3, 4, etc... come up. So the most efficient way to do preloading of keys and to avoid extra network traffic causes by moving data between newly started nodes is to delay preloading until the last node starts.

At GridGain, we introduced this Delayed Preloading feature in our latest 4.2 release and now user is in full control of when data preloading happens - either right away upon node start, after a certain delay, or manually from our Visor Devops Console (note the Preload button on the right hand side):




As you see it does help to delay certain events in distributed systems to achieve better performance.