Blocking Operations, Exceptions, and Logging in Asynchronous Servers


Doug Hoyte

Fractal

Motivation

AnyEvent::Task

Many operations can't easily be done by async apps because they are blocking

Callback::Frame

Log::Defer

AnyEvent::Task

AnyEvent::Task Server/Client

Server:
AnyEvent::Task::Server->new(
  listen => ['unix/', '/tmp/arithmetic.socket'],
  interface => {
                 add => sub { $_[0] + $_[1] },
                 mult => sub { $_[0] * $_[1] },
               },
)->run;
  
Client:
my $client = AnyEvent::Task::Client->new(
               connect => ['unix/',
                           '/tmp/arithmetic.socket'],
             );
  

Using AnyEvent::Task Client

The client object has a checkout method that returns a checkout object representing exclusive access to a worker process

The checkout object has methods that correspond to the interface methods of the server.

  my $checkout = $client->checkout;

  $checkout->add(5, 10, sub {
    my ($checkout, $result) = @_;

    say "5 + 10 = $result";
  });
  
After $checkout goes out of scope, the worker is returned to client's pool

Nested

  $client->checkout->add(3, 4, sub {
    my ($checkout, $result) = @_;

    $checkout->mult($result, $result, sub {
      my ($checkout, $result) = @_;

      say "(3 + 4)^2 = $result";
    });
  });
  

Pipelining

  my $checkout = $client->checkout;

  my $x = $checkout->add(3, 4);

  $checkout->mult($x, $x, sub {
    my ($checkout, $result) = @_;

    say "(3 + 4)^2 = $result";
  });  
  
Note that $x is a promise object here, not the value 7

Advantages of Pipelining

Infectious Promises

You can call methods on promises, de-reference them, etc:

my $dbh = $client->checkout;

## $row is a promise
my $row = $dbh->selectrow_arrayref(q{ SELECT user_id,email
                                      FROM user WHERE username = ? },
                                   undef, $username);

my $user_id = $row->[0]; # $user_id is a promise
my $email = $row->[1]; # $email is a promise

$dbh->do(q{ DELETE FROM user WHERE email = ? AND user_id != ? },
         undef, $email, $user_id);

$dbh->commit(sub {
  ## All done
});
  

Callback::Frame

Broken Async Error Handling

use AnyEvent;
 
eval {
  $watcher = AE::timer 0.1, 0,
    sub {
      die "some error";
    };
};
 
## broken!
if ($@) {
  print STDERR "Oops: $@";
}
 
AE::cv->recv;
  

Error Handling with Callback::Frame

use AnyEvent;
use Callback::Frame;
 
frame_try {
  $watcher = AE::timer 0.1, 0, fub {
                                 die "some error";
                               };
} frame_catch {
  print STDERR "Oops: $@";
};
 
AE::cv->recv;
  

Coping with in-band errors

use AnyEvent::HTTP;
use Callback::Frame;

frame_try {
  $req = http_get "http://google.com", fub {
    my ($body, $hdr) = @_;

    ## Handle errors elsewhere:
    die "request failed" if $hdr->{Status} !~ /^2/;

    print "Request OK\n";
  };
} frame_catch {
  print "Handling error: $@\n";
};

AE::cv->recv;
  

Log::Defer

I believe a lot of log processing is done too early

Structured Logging

File format is message-per-line, minified JSON:
{"logs":[[0.000158,30,"this is an info message"],[0.000192,20,"some kind of warning!"],[0.200223,10,"There was a serious error: unable to connect to network"]],"timers":[["parse req",0.000224,0.000239],["DB lookup",0.0003,0.152386],["connecting",0.153,0.201],["building resp",0.203,0.204386]],"end":0.205386,"start":1403408091.93565}
  
Rendered with log-defer-viz:
$ log-defer-viz file.log

------ 2014-06-21 Sat 23:34:51.93565 EDT (1403408091.93565) ------
  | 0.000158 [ INFO] this is an info message
  | 0.000192 [ WARN] some kind of warning!
  | 0.200223 [ERROR] There was a serious error: unable to connect to network
  |_0.205386 [END]

     parse req X
     DB lookup |=====================================================|
    connecting                                                       |================|
 building resp                                                                         X
_______________________________________________________________________________________________
times in ms    0.2                                                   153.0            203.0
                                                                     152.4            201.0
  

log-defer-viz

Muffle warnings:
 $ log-defer-viz --nowarn *.log 
Follow log file's tail:
 $ log-defer-viz -f file.log 
Do-What-I-Mean with compressed files:
 $ log-defer-viz file.log.gz file2.log.bz2 
View times in UTC (instead of local time):
 $ log-defer-viz --tz UTC file.log 
Sort and merge logs from multiple servers:
 $ log-defer-viz --sort-time www*/access.2014-02-12.log 

grep and map

Only show requests that took longer than 500ms:
 $ log-defer-viz --grep '$_->{end} > 0.5' file.log 
JSON format is pre-grepable for performance:
 $ grep 192.168.0.77 file.log | log-defer-viz
Extract usernames from data section, tally occurrences:
 $ log-defer-viz --map '$_->{data}{username}' *.log \
   | sort | uniq -c | sort -rn

Questions?