Skip to content

Take Control of ADO.NET Serialization

November 3, 2010

ADO.NET data services provide a rich way to access data over the network using REST APIs.  It’s easy to use because the .NET libraries allow you to hook up to many different kind of data sources, connect over networks, and use high-level query syntax such as LINQ.  In this article, I describe how to take control over the serialization on the server side of an ADO.NET data service.

Background

Typically, ADO.NET is used to access data stored in relational databases or key/value stores, but it doesn’t (natively) support streaming of continuous entities.  Nevertheless, with a bit of work, you can take over the connection and start streaming to clients yourself.  Tom Laird-McConnell wrote an excellent article that describes this technique.  I’m using the software that Tom created to stream data to hundreds of clients.

I won’t repeat why streaming from ADO.NET is an intriguing option, because the reasons are bulleted at the start of Tom’s article; the rest of my post assumes you’ve read or at least skimmed his main points.  The gist is to create a streaming data service class that overrides DataService<> and implements an IHttpHandler interface.  The latter allows control over the connection.   You also must create a class implementing IDataServiceHost, which attaches to each DataService connection–this gives control over the connection state.  And you need to implement an IQueryable<> wrapper that  prevents clients from requesting actions that don’t make sense on endless streams, such as sorting.  Finally, your data source will return an enumeration over the real-time stream of data by wrapping it in this IQueryable<>.  An implementation of each of these classes is presented in Tom’s post.

The Problem

I have a system that takes incoming data from social networks, processes the data, and then stream the results to many different consumers (including the Bing search index, trend detectors, and maps).  As soon as data is processed, it is served to clients as an endless stream of data entities.  This is a classic consumer-producer system, with a single producer and many consumers.

As more and more clients connect, the CPU begins to saturate.  As it turns out, the DataService serializes each entity to either XML Atoms or JSON.  In our case, each client is pulling the same data from the real-time stream, so if 100 clients receive the same entity, it will be serialized 100 times, and this dominates the overall processing.  As you can imagine, this is not an effective use of CPU.  To remedy this, we need to serialize each entity to a string once, and reuse the string for each client.  Unfortunately, as customizable as the DataService is, there doesn’t seem to be a hook to control the serialization on the server side.

From what I found by examining the DataService code in .NET Reflector, all the serialization code was private and not overridable.  I did find a promising interface that allows control over the way data is serialized for WCF services, and I still haven’t really determined if (and how) this could be leveraged in my case.   Perhaps it’s possible, but I already forged ahead with a different technique before discovering this.

The Approach

Below I include many code snippets that use the types introduced in Tom’s article to demonstrate how to manually control serialization.  The key is to take over the processing of the request:
public class StreamingDataService : DataService, IHttpHandler where DataModelT : class
{
    public void ProcessRequest(HttpContext)
    {
        ...
        // This will send results until the client connection is dropped
        ProcessRequest();
        ...
    }
}
Becomes:
public class StreamingDataService : DataService, IHttpHandler where DataModelT : class
{
    public void ProcessRequest(HttpContext)
    {
        ...
        bool processed = false;
        if (_manualQueryProcessingEnabled)
           processed = ProcessRequestManually(context, httpHost.ResponseStream);
        if (!processed)
            ProcessRequest();  // default ADO.NET handler
        ...
    }
 
    abstract bool ProcessRequestManually(HttpContext context, Stream responseStream);
}

Notice that is possible for ProcessRequestManually to decide it can’t handle the request by returning false.  In that case it falls back on the default processing instead.  This is because I implemented a simple (dumb) parser that understands a subset of possible queries; it handles all the queries the clients currently use, but I didn’t want it to drop new queries it can’t yet parse, so it lets the ADO.NET take over (bypassing the serialization cache).  These unhandled queries are logged; if they become frequent, the parser is extended to handle them.  The _manualQueryProcessingEnabled is a runtime configuration option to easily disable the feature when I need to.

Here’s ProcessRequestManually, which is added to the QuoteStreamService class:

override bool ProcessRequestManually(HttpContext context, Stream responseStream)
{
    bool processed = false;
    if (context.Request.PathInfo.StartsWith("/Quotes"))
    {
        StockQuoteStream service = this.CreateDataSource() as StockQuoteStream;
        if (service != null)
            processed = ManualQueryExecutor.ProcessRequest(service.Quotes, responseStream, context.Request, context.Response);
    }
    return processed;
}

ProcessRequestManually creates a data source,  which is the StockQuoteStream object.  This is the object the query executes against, so it gets passed to the following ProcessRequest function, that lives in a static class called ManualQueryExecutor:

static byte[] _jsonPrefix = Encoding.UTF8.GetBytes("{\n\"d\": [\n");
static byte[] _jsonPostfix = Encoding.UTF8.GetBytes("]\n}\n");
 
static bool ProcessRequest(IQueryable query, Stream responseStream, HttpRequest request, HttpResponse response)
{
    query = GetQuery(query, request);
    if (query == null)
        return false;
 
    DateTime lastFlush = DateTime.UtcNow;
    int bytesOut = 0;
    bool started = false;
 
    foreach (var i in query)
    {
        if (!started)
        {
            response.ContentType = "application/json";
            response.Cache.SetCacheability(HttpCacheability.NoCache);
 
            responseStream.Write(_jsonPrefix, 0, _jsonPrefix.Length);
            started = true;
        }
        else
        {
            responseStream.WriteByte((byte)',');
        }
 
        bytesOut += i.ToJson(responseStream);
        DateTime now = DateTime.UtcNow;
        if ((bytesOut > flushWhen) || ((now - lastFlush) > _flushInterval))
        {
             response.Flush();
             lastFlush = now;
             bytesOut = 0;
        }
    }
 
    if (started)
        responseStream.Write(_jsonPostfix, 0, _jsonPostfix.Length);
 
    return true;
}


The ProcessRequest function first calls GetQuery to parse the URL to  a query.  Then, if the query was created successfully, the foreach loop will execute the query, which results in the production of entities from the real-time stream.  At this point, we receive an unserialized  entity.  I extended the entity’s API  to include a function called ToJson(), which returns the entity serialized as JSON in a UTF8 encoded byte array.  The key is that ToJson() returns a cached instance of the string, so the serialization only happens once. This is because all clients are expecting JSON.  If I were also supporting XML Atoms, I would need to parse the request header to determine which format is requested (by looking at the “Accept” parameter of the HTTP header).  In the loop the data is just sent by reading the JSON string to the response stream.

I left out details of the code behind ToJson(), but it makes a straightforward conversion from the entity to JSON, and caches it with the entity.  The only gotcha is that clients expect the OData metadata for each entity, so the serializer must create it (and means you can’t simply just use the DataContractJsonSerializer).  This amounts to adding a JSON object as the first nested object in the entity that looks like:

"__metadata": {
    "uri": "http://yourservice/QuoteStream.ashx/Quotes",
    "type": "Quote"
}

Also, _jsonPrefix and _jsonPostfix wrap the entire sequence of entities in a json array named “d”, and the loop adds a comma between each entity as expected for a JSON array, resulting in:  { "d": [ entity0, entity1 ] }.  Each entity is a JSON object with OData metadata. If you wonder why writing _jsonPrefix is delayed until inside the loop instead of before, it’s due to a subtle gotcha from deep within our processing: sometimes execution of a query will trigger an HTTP redirect to a different server… if any data was output before then, the redirect fails. Since the query does not begin executing until the foreach statement, the first output is postponed.

Flushing for the output stream is disabled, this way I can manually control when the stream gets flushed, and do so after either a certain number of bytes or elapsed time. This way performance of the stream can be tuned slightly. IIS performs HTTP chunking at every flush point.

So far this hasn’t been too much work, and pretty straightforward; the only tricky part is writing a custom JSON serializer that writes OData format (unfortunately, although .NET has one built into its DataService, it does expose it for public use).  But writing JSON is not too difficult.  The only detail left undone is to parse the request URL into a queryable object.  Again, .NET has code to do this but has deftly kept it private.

The role of GetQuery is to convert the request from the OData URL specification to a LINQ query. Since .NET hides this functionality,  you can get there by rewriting the URL into the C# equivalent, then use the Dynamic LINQ library to convert.  Dynamic LINQ is a library whose source code is provided by MSDN as part of C# sample pack.

For example, suppose the client side issues the following query:

var results = ( from quote in quoteStream.Quotes.Expand("Company")
    where quote.Symbol == "IBM" && quote.Delta > 0.1
    select quote).Take(5);

The client data service converts this to a URL query:

http://yourservice/QuoteStream.ashx/Quotes()?$filter=(Symbol eq 'IBM') and (Delta gt 0.1)&$top=5&$expand=Company

In order to rebuild the query, the URL filter arguments must be coverted to C# syntax for parsing by Dynamic LINQ.  In this example, the query $filter component of the argument is converted as string from "(symbol eq 'en') and (value gt 10)" to "(symbol == "en") && (value > 10)".   Dynamic LINQ extends the Where function to accept a string argument that it interprets and compiles into a queryable object.  If it fails to parse, it will throw an exception that I catch, so that I may fallback on the default ADO.NET implementation to handle the query.  The $top argument is also parsed, and may be straightforwardly applied to the query via its Take function.  For now, the $expand argument is just ignored.

GetQuery, the last piece in the puzzle, is the code that handles this conversion by doing a series of string replacements on the $filter argument.  The following code snippet all resides inside the ManualQueryExecutor class:

static RegexSearch[] _regexSearches = new RegexSearch[] {
    new RegexSearch { regex= new Regex(@"endswith\((?.*?),'(?.*?)'\)", RegexOptions.Compiled), funcToReplace= "EndsWith" },
    new RegexSearch { regex= new Regex(@"substringof\('(?.*?)',(?.*?)\)", RegexOptions.Compiled), funcToReplace= "Contains" }
};
 
static Regex _longIntMatch = new Regex(@"(\d+)L", RegexOptions.Compiled);
 
class RegexSearch
{
    public Regex regex;
    public string funcToReplace;
}
 
struct StringMatcher
{
    public int start;
    public int stop;
    public string replace;
}
 
static private void StringMatch(string str, RegexSearch rgx, List lsm)
{
    Match m = rgx.regex.Match(str);
 
    while (m.Success)
    {
         string replace = String.Format("{0}.{1}(\"{2}\")", m.Groups["obj"], rgx.funcToReplace, m.Groups["lit"]);
         lsm.Add(new StringMatcher { start = m.Index, stop = m.Index + m.Length, replace = replace });
         m = m.NextMatch();
     }
}
 
static private void StringMatching(StringBuilder sb)
{
    string origStr = sb.ToString();
    List lsm = new List();
 
    foreach (var rgxSearch in _regexSearches)
        StringMatch(origStr, rgxSearch, lsm);
 
    if (lsm.Count > 0)
    {
        // reset the string to rebuild it with the replacements
        sb.Length = 0;
        int loc = 0;
        foreach (var lsmi in lsm)
        {
            sb.Append(origStr.Substring(loc, lsmi.start - loc));
            loc = lsmi.stop;
            sb.Append(lsmi.replace);
        }
        sb.Append(origStr.Substring(loc));
    }
}
 
static private IQueryable GetQuery(IQueryable query, HttpRequest request)
{
    string filterString = request.QueryString["$filter"];
    if (!String.IsNullOrEmpty(filterString))
    {
        StringBuilder sb = new StringBuilder(filterString);
        StringMatching(sb);
        sb.Replace(" eq ", " == ");
        sb.Replace(" ne ", " != ");
        sb.Replace(" ge ", " >= ");
        sb.Replace(" gt ", " > ");
        sb.Replace(" lt ", " < ");
        sb.Replace(" le ", " <= ");
        sb.Replace(" and ", " && ");
        sb.Replace(" or ", " || ");
        sb.Replace("not ", "!");
        sb.Replace(" mod ", " % ");
        sb.Replace('\'', '"');
 
        // must replace / with . if they are not within quotes.
        if (sb.ToString().Contains('/'))
        {
            bool inQuotes = false;
            List indices = new List();
            for (int i = 0; i < sb.Length; i++)
            {
                if (sb[i] == '"')
                    inQuotes = !inQuotes;
                else if (sb[i] == '/' && !inQuotes)
                    indices.Add(i);
            }
 
            if (indices.Count > 0)
                foreach (int i in indices)
                    sb.Replace('/', '.', i, 1);
        }
 
        string s = _longIntMatch.Replace(sb.ToString(), "$1");
 
        // This is using the DynamicQueryable extension to compile the query
        try
        {
            query = query.Where(s);
        }
        catch (System.Linq.Dynamic.ParseException)
        {
            Trace.TraceWarning("Manual query not handled: {0} -> {1}", request.QueryString, s);
            query = null;
        }
    }
 
    if (query != null)
    {
        string topString = request.QueryString["$top"];
        if (!String.IsNullOrEmpty(topString))
            query = query.Take(int.Parse(topString));
    }
 
    return query;
}

Warning, the above code is ugly (but you probably already noticed) and I’m a bit embarrassed to present it:  it is error prone, probably performs poorly, and is inelegant.  Nevertheless, I felt I better show it otherwise the guts of this solution would be missing.  I don’t really recommend doing it this way.  A better way is to write a proper parser or more sophisticated regex.  However this gets the job done by brute force replacing the URL syntax until it conforms to C# syntax.

At least part of the potential performance problem is addressed by caching the computation of the query. Since many clients connect using the same queries, keeping a hash table of $filter arguments to the resulting query for those already seen prevents recomputing many of the queries. The production code uses this hash, but I removed it here to make the code shorter.

The Downside

This approach of controlling the serialization of entities so that the serialized strings may be cached and reused by many clients really helped the situation with servers being overtaxed.  When deployed to production I found that CPUs were no longer saturated and were able to handle more clients.  However, the code as described above has a couple of limitations: it ignores and essentially disables expansion of entities and projection of properties.  I’ll briefly describe these limitations.

Expansion allows you to take the nested components of an entity.  In the above example, the query said quoteStream.Quotes.Expand("Company"), meaning that in addition to requesting the Quote object, it would like a Company object that is associated with that quote.  The Quote has a nested entity called Company.  Without expansion, the quote is sent with a null reference for the company information.  By explicitly expanding it, the company data will come along with the quote.  In my initial implementation of the JSON serializer, I automatically wrote all the data, so that each pre-serialized entity already contained anything that could be expanded.  This solved the problem that $expand was being ignored, because you would always receive the data whether you expanded it or not.  The downside is that clients not needing the data got it anyway, increasing the bandwidth required for their connections.  I was wasting network on data that was being tossed away.

Eventually I added expansion capability back to the system by changing the way the JSON serializer worked.  Instead of writing out the whole string, I wrote chunks with breaks where entities could be expanded.  In those breaks, I may choose to insert either the expanded entity (also already serialized) or a placeholder (unexpanded entities are replaced with an object to describe them–see OData spec referenced above for more info).  This complicated things a bit:  I must assemble a chain of buffers for each entity on a per query basis, and deal with arbitrarily deep nesting of entities, but still the bulk of the serialization work is done  before the query, so it still comes out a win.  This JSON serializer is probably worth a whole blog entry if I get around to it.

As for projection, it is still disabled.  Projection has the nice quality that you can further reduce the bandwidth by selecting only properties you are interested in.  For example, the query above could be rewritten to only send the price:

var results = ( from quote in quoteStream.Quotes
    where quote.Symbol == "IBM" && quote.Delta > 0.1
    select new Quote { Price = quote.Price } ).Take(5);

Instead of sending entire quotes, it creates new ones with only a Price property.  As a result, on the wire only the price data is sent.  Although this saves bandwidth on the connection, we found that projection was very expensive on the server side.  Clients that were trying to keep up with real-time data streams were falling behind when using projection queries.  Therefore, for now I made no effort to support projection and just ignore it, sending the entire entity.  This means clients will receive more data than they ask for, but can discard it.

Conclusion

In this blog entry I described a pretty hacky technique for bypassing some of the functionality of ADO.NET in order to control serialization.  This allowed me to serialize each entity once, so that its cached result could be reused for multiple clients.  It was fun to implement this project and worthwhile as it improved the performance of my servers.  Unfortunately, it duplicates a lot of the functionality that ADO.NET already implements such as URL parsing, and the OData JSON formatting.  It would’ve been nice if ADO.NET exposed some of this functionally, or provided simple hooks to do custom serialization (or just did the caching on its own!).


No comments yet

Leave a Reply

Your email address will not be published. Required fields are marked *