среда, 12 октября 2016 г.

Asynchronous handlers in nginx-haskell-module

My journey into learning of how to make haskell and nginx cooperate keeps going. Below is the latest chapter Asynchronous tasks with side effects from the README in the nginx-haskell-module. All variable handlers we met so far were pure haskell functions without side effects. Inability to put side effects into pure functions has a great significance in the sense that it gives strong guarantees about the time the functions run. In haskell, functions that may produce side effects are normally wrapped inside IO monad. They can do various non-deterministic IO computations like reading or writing files, connecting to network servers etc., which, in principle, may last unpredictably long or even eternally. Despite this, having IO functions as nginx variable handlers are extremely tempting as it makes possible to perform arbitrary IO tasks during an HTTP request. To eliminate their non-probabilistic duration downside, they could be run asynchronously in green threads provided by the haskell RTS library, and somehow signal the nginx worker’s main thread after their computations finish. This is exactly what happens in special handler NGX_EXPORT_ASYNC_IOY_Y. Consider the following example.
user                    nobody;
worker_processes        2;

events {
    worker_connections  1024;
}

http {
    default_type        application/octet-stream;
    sendfile            on;

    haskell compile threaded standalone /tmp/ngx_haskell.hs '

import qualified Data.ByteString.Char8 as C8
import qualified Data.ByteString.Lazy.Char8 as C8L
import           Network.HTTP.Client
import           Control.Concurrent
import           Control.Exception
import           Safe

catchHttpException = (`catch` \e ->
        return $ C8L.pack $ "HTTP EXCEPTION: " ++ show (e :: HttpException))

getResponse (C8.unpack -> url) = fmap responseBody . (parseRequest url >>=)

getUrl url = do
    man <- newManager defaultManagerSettings
    catchHttpException $ getResponse url $ flip httpLbs man
NGX_EXPORT_ASYNC_IOY_Y (getUrl)

threadDelaySec = threadDelay . (* 10^6)

delay (readDef 0 . C8.unpack -> v) =
    threadDelaySec v >> return (C8L.pack $ show v)
NGX_EXPORT_ASYNC_IOY_Y (delay)

    ';

    server {
        listen       8010;
        server_name  main;
        error_log    /tmp/nginx-test-haskell-error.log;
        access_log   /tmp/nginx-test-haskell-access.log;

        location / {
            haskell_run_async getUrl $hs_async_ya
                    "http://ya.ru";
            haskell_run_async getUrl $hs_async_httpbin
                    "http://httpbin.org";
            haskell_run_async getUrl $hs_async_hackage
                    "http://hackage.haskell.org";
            echo "------> YA.RU:\n\n$hs_async_ya\n";
            echo "------> HTTPBIN.ORG:\n\n$hs_async_httpbin\n";
            echo "------> HACKAGE.HASKELL.ORG:\n\n$hs_async_hackage";
        }

        location /rewrite {
            rewrite ^ / last;
        }

        location /delay {
            haskell_run_async delay $hs_async_elapsed $arg_a;
            echo "Elapsed $hs_async_elapsed seconds";
        }
    }
}
Notice that the haskell code was compiled with flag threaded which is important for running asynchronous tasks. Function getUrl is an HTTP client that returns the response body or a special message if an HTTP exception has happened. Inside location / there are 3 directives haskell_run_async which spawn 3 asynchronous tasks run by getUrl, and bind future results to 3 different variables accessed later by directives echo in the nginx content phase. Async variable handlers are very special. In fact, the IO task gets spawned even if the bound variable is not accessed anywhere. All the tasks are spawned during early nginx rewrite phase (before all rewrite directives) or late rewrite phase (when all location rewrites are done: this ensures that all tasks in the final rewritten location will run). The request won’t proceed to later phases until all async tasks are done. Technically, an async task signals the main nginx thread when it finishes by writing a byte into the write-end file descriptor of a dedicated self-pipe. The read-end file descriptor of the pipe are polled by the nginx event poller (normally epoll in Linux). When a task is finished, the poller calls a special callback that checks if there are more async tasks for this request and spawns the next one or finally finishes the rewrite phase handler by returning NGX_DECLINED. All types of exceptions are caught inside async handlers. If an exception has happened, the async handler writes its message in the bound variable’s data, whereas the variable handler logs it when accessed. However, for better control, you may want to catch exceptions inside your code like in the getUrl. Let’s do some tests.
curl 'http://localhost:8010/'
Here you will see too long output with the 3 http sites content, I don’t show it here. Let’s run 20 requests simultaneously.
for i in {1..20} ; do curl -s 'http://localhost:8010/' & done
20 times longer output! Let’s make a timer for 20 seconds from 20 parallel requests.
for i in {1..20} ; do curl -s "http://localhost:8010/delay?a=$i" & done
Elapsed 1 seconds
Elapsed 2 seconds
Elapsed 3 seconds
Elapsed 4 seconds
Elapsed 5 seconds
Elapsed 6 seconds
Elapsed 7 seconds
Elapsed 8 seconds
Elapsed 9 seconds
Elapsed 10 seconds
Elapsed 11 seconds
Elapsed 12 seconds
Elapsed 13 seconds
Elapsed 14 seconds
Elapsed 15 seconds
Elapsed 16 seconds
Elapsed 17 seconds
Elapsed 18 seconds
Elapsed 19 seconds
Elapsed 20 seconds
Make sure it prints out every one second: this marks that requests are processed asynchronously! In the second test we ran 20 HTTP requests simultaneously, but could run hundreds and thousands! Some servers may reject so many requests at once (despite the fact that the manager from the Network.HTTP.Client is so advanced that it can share a single connection to the same host between all requests provided it was defined at the top level like
httpManager = unsafePerformIO $ newManager defaultManagerSettings
{-# NOINLINE httpManager #-}

getUrl url = catchHttpException $ getResponse url $ flip httpLbs httpManager
). Fortunately, we can limit number of simultaneous requests with semaphores. Let’s make a semaphore that allows only 1 task at once.
sem1 = unsafePerformIO $ S.new 1
{-# NOINLINE sem1 #-}
Functions unsafePerformIO and new must be imported from modules System.IO.Unsafe and Control.Concurrent.MSem (qualified as S) respectively. This code looks ugly, nevertheless it is safe and will work as expected in our new async handlers getUrl1 and delay1.
getUrl1 url = do
    man <- newManager defaultManagerSettings
    catchHttpException $ getResponse url $ S.with sem1 . flip httpLbs man
NGX_EXPORT_ASYNC_IOY_Y (getUrl1)

delay1 (readDef 0 . C8.unpack -> v) =
    S.with sem1 (threadDelaySec v) >> return (C8L.pack $ show v)
NGX_EXPORT_ASYNC_IOY_Y (delay1)
Put the new handlers in locations / and /delay and make the 20-requests tests again to see how they change the async behavior. For example, responses from location /delay must become so long as if they were not run asynchronously, however they must be finishing not in order. Be aware that sem1 is shared between all async handlers that use it, this means that simultaneous requests to locations / and /delay will probably wait for each other: use different semaphores for different handlers when it is not desirable. Starting an async task that normally returns identical result on every new request may be unnecessarily expensive. In the above example function getUrl must presumably return the same value during a long period of time (days, months or even years). For this case there is another handler NGX_EXPORT_SERVICE_IOY_Y that runs an async task as a service. Let’s put the following service function inside our haskell code.
getUrlService url firstRun = do
    unless firstRun $ threadDelaySec 20
    getUrl url
NGX_EXPORT_SERVICE_IOY_Y (getUrlService)
(For function unless module Control.Monad must be additionally imported.) Function getUrlService accepts two arguments, the second is a boolean value that denotes whether the service runs for the first time: it is supposed to be used to skip threadDelay on the first run. Using threadDelay in a service task is very important, because without any delay nginx will restart it very often. Let’s start getUrlService.
    haskell_run_service getUrlService $hs_service_ya "http://ya.ru";
    haskell_run_service getUrlService $hs_service_httpbin "http://httpbin.org";
Directives haskell_run_service must locate in the http clause of the nginx configuration after directive haskell compile. In contrast with other types of handlers, service handlers cannot refer to variables in their arguments as soon as nginx variable handlers always refer to a request which is not possible here. Put locations for showing data collected by the services and we are done.
        location /ya {
            echo $hs_service_ya;
        }

        location /httpbin {
            echo $hs_service_httpbin;
        }
Complex scenarios may require synchronous access to handlers with side effects. For example it could be an ad-hoc error_page redirection loop: asynchronous handlers do not suit here very well. For such cases another handler NGX_EXPORT_IOY_Y may appear useful. Below is a toy example of a synchronous handler declaration.
getIOValue = const $ return $ C8L.pack "HELLO WORLD!"
NGX_EXPORT_IOY_Y (getIOValue)
You can find all the examples shown here in file test/tsung/nginx-async.conf.