#! /usr/bin/perl

use strict ;
use warnings ;

package Qdb ;

use JSON::XS ;
use DBI ;
use IO::Pipe ;
use IO::Select ;
use IO::Socket::INET ;
use Proc::Daemon 0.14 ;
use Net::hostent qw(gethost) ;
use Net::Netmask ;
use Socket qw(inet_ntoa) ;
use Time::HiRes ;
use File::Path ;
use Digest::MD5 ;

use constant
  { PROG => 'qdb'
  } ;

use constant
  { REPO => 'https://svn.science.uu.nl/repos/project.qdb/'
  , PLVL => '12'
  , CDIR => '/etc/' . PROG
  , MAXQ => 1000
  , DEF_LITE => 'cpan.lite'
  , TAB_EVTS => 'events'
  , TAB_META => 'meta'
  , DEF_LOGF => PROG . ".log"
  , CNF_PUSH => "push"
  } ;

our $VERSION  = '0.04' ; sub VERSION { $VERSION ; }

sub get_revision
  { my $mk = shift ; # ignore -f 'REVISION'
    return PLVL unless PLVL =~ /REVISION/ ;
    my $res ;
    if ( $mk or ! -f 'REVISION' )
      { my $tmp = `svn info Qdb.pm | grep Revision:` ;
        $res = ( $tmp and $tmp =~ /^Revision:\s+(\d+)/ ) ? $1 : 'l' ;
      }
    else
      { $res = `cat REVISION` ; chomp $res ; }
    $res ;
  }

our $REVISION = get_revision ;
our $Version  = "$VERSION-p$REVISION" ;
sub Version { sprintf "%s-%s", PROG, $Version ; }

###################################################################
package OBB ;

use constant PROG => Qdb::PROG ;

our ( @EXPORT, @EXPORT_OK, @ISA ) ;

BEGIN
  { require Exporter ;
    @EXPORT    = qw(logs logq logt logv logd TT) ;
    @EXPORT_OK = qw() ;
    @ISA       = qw(Exporter)
  }

# use base 'OBB' ; __PACKAGE__ -> mk_getset ( qw(name ...) ) ;

use Carp ;

use constant VLVLS => qw(Silent Quiet Terse Verbose Debug Trace) ;
our %VLVLS = () ;
{ my $cnt = 1 ; $VLVLS { $_ } = $cnt ++ for ( VLVLS ) ; }
{ my $cnt = 1 ; for ( VLVLS ) { $VLVLS { $cnt } = $cnt ; $cnt ++ ; } }
sub _VLVL { $VLVLS { shift @_ } ; }
our $Verbosity = $VLVLS { Terse } ;
# get Verbosity ; or set Verbosity and return previous Verbosity
sub Verbosity
  { my $self = shift ;
    my $res  = $Verbosity ;
    if ( @_ )
      { my $tmp = shift @_ ;
        my $lvl = $VLVLS { $tmp } or die "bad level $tmp" ;
        $Verbosity = $lvl ;
      }
    $res ;
  }

sub date
  { my ( $sec, $mic ) = Time::HiRes::gettimeofday ;
    my $mcs = sprintf '%03d', int ( $mic / 1000 ) ;
    my ($X,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime $sec ;
    sprintf '%4d-%02d-%02d %02d:%02d:%02d.%03d', $year + 1900, $mon + 1, $mday
      , $hour, $min, $sec % 60, int ( $mic / 1000 + 0.5 ) ;
  }

sub fmt_txt
  { # sprintf can't have a @list as first argument
    my $fmt = shift ;
    my $txt = $fmt ; $txt = sprintf $fmt, @_ if @_ ; chomp $txt ;
    $txt ;
  }

sub logx
  { my $msg = fmt_txt @_ ; printf "%s %s[%s] %s\n", date, PROG, $$, $msg ; }

for my $lvl ( VLVLS )
  { my $num = $VLVLS { $lvl } or die "bad num for lvl [$lvl]" ;
    my $sub = sprintf "sub $lvl { \$Verbosity >= $num ; }" ;
    eval $sub ; die $@ if $@ ;
    my $ini = lc substr $lvl, 0, 1 ;
    $sub = "sub log$ini { logx ( \@_ ) if OBB -> $lvl ; }" ;
    unless ( $lvl eq 'Trace' ) { eval $sub ; die $@ if $@ ; }
  }

sub mk_getset
  { my $self = shift ;
    my $sub  = <<'SUB' ;
sub %s::%s { my $self = shift ; $self -> {%s} = shift if @_ ; $self -> {%s} ; }
SUB
    my @bads = grep ! /^[A-Za-z_]\w*$/, @_ ;
    die "mk_getset: bad name [@bads]\n" if @bads ;
    eval sprintf $sub, $self, $_, $_, $_ for @_ ;
  }
sub mk_get
  { my $self = shift ;
    my $sub  = <<'SUB' ;
sub %s::%s
  { my $self = shift ;
    Carp::confess "mk_get %s : can't set" if @_ ;
    $self -> {%s} ;
  }
SUB
    my @bads = grep ! /^[A-Za-z_]\w*$/, @_ ;
    die "mk_getset: bad name [@bads]\n" if @bads ;
    eval sprintf $sub, $self, $_, $_, $_ for @_ ;
  }
sub New  { my $self = shift ; bless {}, $self ; }
sub Defs { () ; }
sub Init
  { my $self = shift ;
    my %opts = ( $self -> Defs, @_ ) ;
    my @opts = keys %opts ;
    @$self { @opts } = @opts { @opts } ;
    $self ;
  }
sub Make  { my $self = shift ; $self -> New -> Init ( @_ ) ; }
sub Die
  { my $self = shift ;
    my $fmt  = shift ;
    my $msg  = sprintf "[error] $fmt", @_ ;
    Util::syslog ( $msg ) ;
    confess "confess $msg" ;
  }
sub Warn
  { my $self = shift ; my $fmt = shift ; warn sprintf "[warng] $fmt\n", @_ ; }
sub Xit
  { my $self = shift ;
    my $fmt = shift ;
    confess ( sprintf $fmt, @_ ) ;
  }
sub A_is { confess ( "! #args == $_[0]" ) unless $_[1] == $_[0] ; }
sub A_ge { confess ( "! #args >= $_[0]" ) unless $_[1] >= $_[0] ; }
sub A_in
  { A_is 3, scalar @_ ;
    my ( $lo, $hi, $sz ) = @_ ;
    confess ( "! #args in [$lo,$hi]" ) unless $lo <= $sz and $sz <= $hi ;
  }

sub A_defd
  { for ( my $i = 0 ; $i < scalar @_ ; $i ++ )
      { confess ( "! defined arg $i" ) unless defined $_[$i] ; }
  }

sub _addr
  { my $x = shift ;
    ( defined $x and ref $x ) ? sprintf ( "%s", $x ) : ref $x ;
  }
sub _blessed
  { my $r = shift ;
    my $res = ( defined $r and ref $r and UNIVERSAL::isa ( $r, 'UNIVERSAL' ) ) ;
    $res ;
  }

sub _unbless ;
sub _unbless
  { my $obj = shift ;
    my $lvl = shift ;
    my $hav = shift ;
    my @pth = @_ ;
    my $ind = '  ' x $lvl ;
    my $res ;
    if ( ! ref $obj )
      { if ( ! defined $obj )
          { $res = undef ; }
        elsif ( $obj eq '' )
          { $res = '' ; }
        else
          { $res = "$obj" ; }
      }
    elsif ( $hav -> { $obj } )
      { $res = sprintf "%s %s", ref $obj, $hav -> { $obj } ; }
    else
      { $hav -> { $obj } = join '/', @pth ;
        my $addr = _addr ( $obj ) ;
        if ( $addr =~ /HASH\(/ )
          { $res = {} ;
            for my $key ( sort keys %$obj )
              { my $val = $obj -> { $key } ;
                $res -> { $key } = _unbless $val, $lvl + 1, $hav, @pth, $key ;
              }
          }
        elsif ( $addr =~ /ARRAY\(/ )
          { my $i = 0 ;
            $res = [ map { _unbless $_, $lvl + 1, $hav, @pth, $i ++ } @$obj ] ;
          }
        elsif ( $addr =~ /GLOB\(/ )
          { $res = $addr ; }
        elsif ( ! ref $addr )
          { $res = $addr ; }
        else
          { die sprintf "can't unbless %s = %s", $addr, ref $obj ; }
      }
    $res ;
  }
sub Unbless { _unbless ( $_[1], 0 ) ; }

# if   $dst == undef : print to \*STDOUT
# elif ref dst == IO::FILE : print to $dst
# else $dst : return Util::as_text
sub Dmp
  { my $obj = shift ;
    my $tag = shift ;
    my $suf = shift || '' ;
    my $fh  = shift || \*STDOUT ;
    my $txt = JSON::XS -> new -> utf8 -> pretty -> canonical
      -> allow_nonref ( 1 ) -> encode ( OBB -> Unbless ( $obj ) ) ;
    $txt .= "\n" unless substr $txt, -1 eq "\n" ;
    print $fh ''
      . ( $tag ? "$tag :\n" : '' )
      . $txt
      . $suf
      ;
  }

sub Tx
  { # printf "TT[%s]\n", join '|', @_ ;
    my $fmt = shift || 'TT' ;
    my $txt = sprintf $fmt, map
      { ( ref $_ and $_ -> can ( 'diag' ) )
      ? $_ -> diag
      : Util::diag ( $_ )
      } @_ ;
    my ( $sec, $mic ) = Time::HiRes::gettimeofday ;
    sprintf "time %.3f %s\n", $sec + $mic / 1000000 - $^T, $txt ;
  }

sub TT { return unless OBB -> Trace ; print Tx ( @_ ) ; }

###################################################################
package Util ;

our ( @EXPORT, @EXPORT_OK, @ISA ) ;

BEGIN
  { require Exporter ;
    @EXPORT    = qw(syslog) ;
    @EXPORT_OK = qw() ;
    @ISA       = qw(Exporter)
  }

OBB -> import ;

use Exporter ;
use Net::hostent ;
use Socket ;

use constant PROG => Qdb::PROG ;

our $IP_PAT = '^\d{1,3}(\.\d{1,3}){3}(/\d\d?)?$' ;
our $CONF ;
our $STOP = 'dirty' ;
our $EXEC = 0 ;
our $MODE = 0 ; sub MODE { $MODE = shift if @_ ; $MODE ; } ;
our $hostname = `hostname` ; chomp $hostname ;

our $_addlog = 0 ;
our $LOGF_hndl ;
sub addlog
  { my $msg = shift ;
    return unless $CONF and ref ( $CONF ) eq 'Conf' ;
    return unless $CONF -> logdir ; # for Pmaker ;
    my $LOGF = $CONF -> log_file ;
    unless ( $LOGF_hndl and defined $LOGF_hndl -> fileno )
      { unless ( $LOGF_hndl = new IO::File ">>$LOGF" )
          { syslog ( "can't append [$LOGF] ($!)" ) unless $_addlog ++ ; }
        else
          { $LOGF_hndl -> autoflush ( 1 ) ; }
      }
    if ( $LOGF_hndl )
      { $LOGF_hndl -> printf
          ( "%s %s[%s] %s\n", scalar ( localtime )
          , PROG, $$, $msg
          ) ;
        $_addlog = 0 ;
      }
  }

sub _syslog
  { my $mesg = shift ;
    my $PROG = PROG ;
    my $lgr =
      ( grep -f $_, map { ( "/$_", "/usr/$_" ) } map "$_/logger", qw(bin sbin)
      ) [ 0 ] ;
    if ( $lgr )
      { my @SYSL = ( $lgr, qw(-p user.err -t), "${PROG}[$$]", '--', $mesg ) ;
        system @SYSL ; # ignore everything
      }
  }

sub syslog
  { my $fmt = shift ;
    my $msg = sprintf $fmt, @_ ; chomp $msg ;
    logt ( $msg . ( $MODE ? " [syslog'ed]" : '' ) ) ;
    _syslog $msg if $MODE ;
  }

sub uniq  { my %x = () ; for ( @_ ) { $x { $_ } ++ ; } sort keys %x ; }

sub diag
  { my $x = shift ;
    $x =~ s/\n/\\n/g if defined $x ;
    unless ( defined $x )
      { '[undef]' ; }
    elsif ( $x eq '' )
      { '[empty]' ; }
    else
      { my $l = length ( $x ) ; my $c = $l < 1000 ;
        sprintf '[%s]', $c ? $x : substr ( $x, 0, 48 ) . "...($l)"  ;
      }
  }

sub get_host_ips
  { my $hnam = shift ;
    my $res  = undef ;
    if ( $hnam =~ /$IP_PAT/ )
      { $res = [ $hnam ] ; }
    elsif ( my $info = Net::hostent::gethost $hnam )
      { $res =
          [ uniq map { Socket::inet_ntoa $_ ; } @{ $info -> addr_list } ] ;
      }
    $res ;
  }

# cache localhost_ips ; re-resolve if not defined
our $localhost_ips = get_host_ips 'localhost' ;
sub localhost_ips { $localhost_ips || get_host_ips 'localhost' ; }

sub as_pvar { my $r = JSON::XS::decode_json ( $_[0] ) ; $r ; }
sub as_text { my $r = JSON::XS::encode_json ( $_[0] ) ; chomp $r ; $r ; }
sub pretty
  { my $v = shift ;
    my $txt = JSON::XS -> new -> utf8 -> pretty -> canonical
      -> allow_nonref ( 1 ) -> encode ( OBB -> Unbless ( $v ) ) ;
    chomp $txt ;
    $txt ;
  }

# may return undef ;
sub get_json
  { my $file = shift ;
    my $res ;
    if ( open FILE, '<', $file )
      { $res = JSON::XS -> new -> decode ( join '', <FILE> ) ; close FILE ; }
    $res ;
  }

sub basename { my $x = shift || $0 ; substr $x, 1 + rindex $x, '/' ; }
sub dirname
  { my $x = shift || $0 ;
    my $idx = rindex $x, '/' ;
    ( $idx == -1 ) ? '.' : substr $x, 0, $idx ;
  }

our %s4u = ( 's' => 1 ) ;
$s4u { m } = 60 * $s4u { s } ;
$s4u { h } = 60 * $s4u { m } ;
$s4u { d } = 24 * $s4u { h } ;
$s4u { w } =  7 * $s4u { d } ;

sub s4uv
  { my $v = shift ;
    my $u = shift ;
    $v =  1  unless defined $v and length $v ;
    $u = 's' unless defined $u and length $u ;
    die "500: no s4u {$u}" unless exists $s4u { $u } ;
    $v * $s4u { $u } ;
  }

# return undef on 'bad spec'
# my $msg = '( [+-] NUM [smhdw] ) ...' ;
# $err = "bad spec ($spec) ; should be like '$msg'" ;
sub secs4spec
  { my $spc = shift ;
    my $num = '[-+]?\d+(\.\d)?' ;
    my $one = "($num)?([smhdw]?)" ;
    my $all = "^($one)+\$" ;
    my $res ;

    if ( $spc =~ /$all/ )
      { my $tmp = $spc ;
        $res = 0 ;
        while ( length $tmp )
          { die "500: '$tmp' ~! /^$one/" unless $tmp =~ /^$one/ ;
            my $num  = $1 ;
            my $unit = $3 ;
            $tmp = $' ;
            $res += s4uv $num, $unit ;
          }
      }
    $res ;
  }

sub sig_name
  { my $sig = shift ;
    ( split ' ', $Config::Config{'sig_name'} ) [ $sig ] || "sig#$sig" ;
  }

sub sig_num
  { my $nam = shift ;
    my @names = split ' ', $Config::Config{sig_name} ;
    my %sig_num ;
    @sig_num { @names } = split ' ', $Config::Config{sig_num} ;
    $sig_num { $nam } ;
  }

sub rotate
  { my $conf = shift ;
    my $log  = $conf -> log_file ;
    addlog "rotate $log" ;
    my $cnt  = ( split ' ', $conf -> rotate ) [ 0 ] ;
    unlink "$log.$cnt" ; # ignore status
    for ( my $i = $cnt - 1 ; $i > 0 ; $i -- )
      { my $src = sprintf "%s.%s", $log, $i ;
        my $dst = sprintf "%s.%s", $log, $i + 1 ;
        rename $src, $dst or addlog "can't rename $src, $dst"
          if -f $src ;
      }
    my $dst = "$log.1" ;
    my $msg = "can't reopen " ;
    rename $log, $dst or addlog "can't rename $log, $dst" if -f $log ;
    close STDOUT ; open STDOUT, '>>', $log or _syslog ( "$msg STDOUT" ) ;
    close STDERR ; open STDERR, '>>', $log or _syslog ( "$msg STDERR" ) ;
  }

sub mk_sum { Digest::MD5 -> new -> add ( $_[0] ) -> hexdigest ; }

# returns a list of all events with epoch > $epoch
# # on error returns a (scalar) error

sub get_events_since
  { my $epoch = shift ;
    my $CPAN  = shift ;
    OBB::logd ( 'get_events_since %s from %s', $epoch, $CPAN ) ;
    my $rcnt = get_json ( "$CPAN/RECENT.recent" )
      or return "can't get $CPAN/RECENT.recent" ;
    my $aggr = $rcnt -> {meta} {aggregator}
      or return "can't get meta/aggregator" ;
    my $add = [] ;
    my $lst ;
    for my $tag ( @$aggr )
      { my $file = "$CPAN/RECENT-$tag.json" ;
        OBB::logd ( "getting $file ..." ) ;
        my $json = Util::get_json $file or return "can't get $file" ;
        my $list = $json -> {recent} or return "no recent in $file" ;
        while ( @$list )
          { my $item  = shift @$list ;
            my $ipoch = $item -> {epoch} ;
            my $path  = $item -> {path} ;
            my $type  = $item -> {type} ;
            if ( $ipoch <= $epoch )
              { OBB::logd ( "done $tag" ) ;
                # un-shift $item ; not used ;
                unshift @$list, $item ;
                last ;
              }
            # skip overlap
            if ( defined $lst and $ipoch >= $lst )
              { OBB::logd ( "skp $tag $type $ipoch $path" ) ;
                next ;
              }
            OBB::logv ( "add $tag $type $ipoch $path" ) ;
            unshift @$add , [ $type , $path , $ipoch ] ;
            $lst = $ipoch ;
          }
        last if @$list ;
      }
    $add ;
  }

###################################################################
package Msg ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(typ msg) ) ;

sub out
  { printf "*** [%s] %s\n"
      , $Msgs::TYPS { $_[0] -> typ }, $_[0] -> msg ;
  }

###################################################################
package Msgs ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(list cnt) ) ;
sub Defs { ( list => [], cnt => {} ) ; }

our %TYPS = ( err => 'error', wrn => 'warning' ) ;
sub typ_ok { $TYPS { $_[0] } ; }

sub add
  { OBB::A_is 3, scalar @_ ;
    my $self = shift ;
    my $typ = shift ;
    my $msg = shift ;
    $self -> Xit ( 'Msgs add : bad typ %s', $typ ) unless typ_ok $typ ;
    push @{ $self -> list }, Msg -> Make ( typ => $typ, msg => $msg ) ;
    $self -> cnt -> { $typ } ++ ;
    $self ;
  }

sub err { $_[0] -> add ( 'err', $_[1] ) ; }
sub wrn { $_[0] -> add ( 'wrn', $_[1] ) ; }
sub errs { $_[0] -> cnt -> { 'err' } ; }
sub wrns { $_[0] -> cnt -> { 'wrn' } ; }

sub Xit_on_errs
  { my $self = shift ;
    my $mesg = shift ;
    $_ -> out for @{ $self -> list } ;
    $self -> Die ( $mesg || 'exit' ) if $self -> errs ;
    @{ $self -> list } = () ;
    $self ;
  }

###################################################################
package Conf ;
use base 'OBB' ;

use constant FILES => ( 'qdb.conf', Qdb::CDIR . '/conf' ) ;
use constant
  { LOGFILE => 'qdb.log'
  , PIDFILE => 'qdb.pid'
  , LCKFILE => 'qdb.lck'
  , STPFILE => 'qdb.stp'
  , DBSFILE => 'qdb.lite'
  } ;

our @FILES = FILES ;
our %DEF_CONF =
  ( port   => 22722
  , cpan   => undef # path/to/cpan
  , logdir => '/var/log/qdb'
  , vardir => '/var/qdb'
  , rundir => '/var/run/qdb'
  , lckdir => '/var/lock/subsys'
  , loglevel => 'Terse'
  , rotate   => '8 1d'
  , max_idle => '5m'
  , permit   => []
  , secret   => undef
  , server   => 'localhost'
  , ival_next_rrr => 0
  , db_init  => 0
  ) ;

__PACKAGE__ -> mk_getset ( qw(file errs), keys %DEF_CONF ) ;

sub Init
  { my $self = shift ;
    my %opts = ( @_ ) ;
    $self -> OBB::Init ( %DEF_CONF, %opts ) ;
    my $optc = $opts { file } ;
    my $file = _use_file ( $optc || @FILES ) ;
    $self -> file ( $file ) ;
    $self -> Die
      ( $optc
      ? "can't find dmon config file [$optc]"
      : sprintf "no dmon config found [%s]", join ',', @FILES
      ) unless defined $file ;
    $self -> errs ( Msgs -> Make ) ;
    $self -> _get_conf ( $file ) -> Xit_on_errs ( 'config errors' ) ;
    $self ;
  }

sub _use_file
  { for my $cand ( @_ ) { return $cand if defined $cand and -r $cand ; }
    undef ;
  }

sub _get_conf
  { my $self = shift ;
    my $path = shift ;
    my $errs = $self -> errs ; 
    my $prev = undef ;
    my @lines = () ;
    open CONF, '<', $path or return $errs -> err ( "can't open $path ($!)" ) ;
    while ( <CONF> )
      { next if /^#/ ;
        next if /^\s*$/ ;
        if ( /^\s/ )
          { s/^\s+// ;
            if ( defined $prev ) { $prev .= " $_" ; } else { $prev = $_ ; }
          }
        else
          { push @lines, $prev if defined $prev ; $prev = $_ ; }
      }
    push @lines, $prev if defined $prev ;
    close CONF ;
    for my $line ( @lines )
      { chomp $line ;
        my ( $key, $val ) = split ' ', $line, 2 ;
        $errs -> err ( "missing value for key $key" ) unless defined $val ;
        unless ( $key eq 'include' or exists $DEF_CONF { $key } )
          { $errs -> err ( "bad key '$key'" ) ; }
        else
          { return $errs -> err ( "Conf can't '$key'" )
              unless $self -> can ( $key ) ;
            my $type = ref $self -> $key ;
            if ( $type eq 'ARRAY' )
              { push @{ $self -> $key }, split ' ', $val ; }
            elsif ( $type eq 'HASH' )
              { my ( $k, $v ) = split ' ', $val, 2 ;
                $self -> $key -> { $k } = $v ;
              }
            else
              { $self -> $key ( $val ) ; }
          }
      }
    $errs ;
  }

sub include
  { my $self = shift ;
    my $path = shift ;
    $self -> _get_conf ( $path ) ;
  }

sub pid_file { sprintf '%s/%s', $_[0] -> rundir, PIDFILE ; }
sub lck_file { sprintf '%s/%s', $_[0] -> rundir, LCKFILE ; }
sub stp_file { sprintf '%s/%s', $_[0] -> rundir, STPFILE ; }
sub dbs_file { sprintf '%s/%s', $_[0] -> vardir, DBSFILE ; }

sub mkdirs
  { my $self = shift ;
    my @keys = qw(logdir rundir vardir) ;
    my @res  = () ;
    for my $key ( @keys )
      { my $dir = $self -> $key ;
        next if -d $dir  ;
        File::Path::make_path ( $dir, { verbose => 1 , mode => 0755, } ) ;
      }
  }

sub log_file { sprintf '%s/%s', $_[0] -> logdir, LOGFILE ; }
sub _make_secr
  { my $res = '' ;
    my @abc = ( 'a' .. 'z' ) ;
    for my $i ( 1 .. 12 ) { $res .= $abc [ int rand ( 26 ) ] ; }
    $res ;
  }

sub make_stp
  { my $self = shift ;
    my $file = $self -> stp_file ;
    if ( open STOP, ">$file" )
      { printf STOP "%s\n", _make_secr ;
        close STOP ;
        chmod 0600, $file or logt ( "can't chmod $file" ) ;
      }
    else
      { logt ( "can't write stop file [$file] ($!) ; nevermind" ) ; }
  }

sub read_stp
  { my $self = shift ;
    my $file = $self -> stp_file ;
    my $res  = undef ;
    if ( open STOP, $file )
      { chomp ( $res = <STOP> ) ; close STOP ; }
    else
      { logt ( "can't read stop file [$file] ($!) ; nevermind" ) ; }
    $res ;
  }

sub rm_stp  { my $self = shift ; unlink $self -> stp_file ; }
sub own_pid { my $self = shift ; ( stat $self -> pid_file ) [ 4 ] ; }

sub sys_lock
  { sprintf "%s/%s", $_[0] -> lckdir, Qdb::PROG ; }
sub make_sys_lock
  { my $res = 0 ;
    if ( open SYSLOCK, ">>", $_[0] -> sys_lock )
      { close SYSLOCK ; $res = 1 ; }
    $res ;
  }
sub rm_sys_lock { unlink $_[0] -> sys_lock ; }

sub rot_cnt { my $self = shift ; ( split ' ', $self -> rotate ) [ 0 ] ; }
sub rot_spc { my $self = shift ; ( split ' ', $self -> rotate ) [ 1 ] ; }
sub rot_ivl { my $self = shift ; Util::secs4spec ( $self -> rot_spc ) ; }

##############################################################
package TS ;

use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(file name dbh Err) ) ;
sub Defs { ( file => '', name => 'data' ) ; }

sub Reset { my $self = shift ; $self -> Err ( undef ) ; $self ; }

sub nn   { my $x = shift ; not not $x ; }
sub null { defined $_[0] ? $_[0] : 'NaN' ; }
sub tnam { my $x = pop ; $x =~ s/[^\w]/_/g ; $x }

sub connect
  { my $self = shift -> Reset ;
    my $file = $self -> file ;
    my $res  = 0 ;
    unless ( $file )
      { $self -> Err ( 'no file' ) ; }
    else
      { # $self -> disconnect ;
        my $dbh = DBI -> connect
          ( "dbi:SQLite:dbname=$file", "", ""
          , { AutoCommit => 1
            , RaiseError => 0
            , sqlite_see_if_its_a_number => 1
            }
          ) ;
        OBB::TT ( 'connect : dbh %s state %s', $dbh, $dbh -> state ) ;
        $res = $self -> dbh ( $dbh ) ;
      }
    $res ;
  }

sub disconnect
  { my $self = shift -> Reset ;
    my $dbh  = $self -> dbh ;
    my $res  = 1 ;
    if ( $dbh and $dbh -> state )
      { $res = $dbh -> disconnect ;
        $self -> dbh ( undef ) if $res ;
      }
    $res ;
  }

sub _table_info
  { my $self = shift ;
    my $dbh  = $self -> dbh ;
    my $res  = undef ;
    return $res unless $dbh ;
    my $sth = $dbh -> table_info ( undef, undef, '%', 'TABLE' ) ;
    if ( $sth )
      { $res = $sth -> fetchall_arrayref ; }
    else
      { $self -> Err ( "_table_info : can't" ) ; }
    $res ;
  }

sub _column_info
  { my $self = shift ;
    my $name = shift ;
    my $dbh  = $self -> dbh  ;
    my $res  = undef ;
    return $res unless $dbh and $name ;
    my $sth = $dbh -> column_info ( undef, undef, $name, '%' ) ;
    if ( $sth )
      { $res = $sth -> fetchall_arrayref ; }
    else
      { $self -> Err ( "_column_info : can't" ) ; }
    $res ;
  }

sub _tabs
  { OBB::A_is 1, scalar @_ ;
    my $self = shift -> Reset ;
    my $info = $self -> _table_info ;
    my $res  = undef ;
    if ( $info )
      { $res = [ sort map { $_ -> [2] ; } @$info ] ; }
    else
      { $self -> Err ( "_table_info : can't" ) ; }
    $res ;
  }

sub _cols
  { OBB::A_is 2, scalar @_ ;
    my $self = shift -> Reset ;
    my $name = shift ;
    my $info = $self -> _column_info ( $name ) ;
    my $res  = undef ;
    if ( $info )
      { $res = [ sort map { $_ -> [3] ; } @$info ] ; }
    else
      { $self -> Err ( "_column_info : can't" ) ; }
    $res ;
  }

sub has_tab
  { OBB::A_is 2, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    scalar grep $_ eq $name, @{ $self -> _tabs } ;
  }

sub has_col
  { OBB::A_is 3, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my $col  = shift ;
    scalar grep $_ eq $col, @{ $self -> _cols ( $name ) } ;
  }

sub has_cols
  { OBB::A_ge 3, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my @cols = @_ ;
    my %have ; $have { $_ } ++ for @{ $self -> _cols ( $name ) } ;
    for my $col ( @cols ) { return 0 unless $have { $col } ; }
    1 ;
  }

sub _flat ;
sub _flat
  { my @res = () ;
    for my $itm ( @_ )
      { push @res, ( ( ref $itm eq 'ARRAY' ) ? _flat @$itm : $itm )
          if defined $itm ;
      }
    @res ;
  }

our %KWDS ;
our @KWDS = qw(where group_by having order_by limit) ;
@KWDS { @KWDS } = map { my $x = $_ ; $x =~ s/_/ / ; uc $x ; } @KWDS ;

sub _mk_sql_select
  { OBB::A_ge 1, scalar @_ ;
    my $name = shift ;
    my %opts = ( from => $name , @_ ) ;
    my $cols = join ( ',', _flat $opts{cols} ) || "$name.*" ;
    sprintf "SELECT %s FROM %s %s %s %s %s %s"
      , $cols, $opts { from }
      , map { $opts{$_} ? "$KWDS{$_} $opts{$_}" : '' ; } @KWDS ;
  }

sub _mk_sql_delete
  { OBB::A_ge 1, scalar @_ ;
    my $name = shift ;
    my %opts = ( from => $name , @_ ) ;
    sprintf "DELETE FROM %s %s"
      , $opts { from }
      , map { $opts{$_} ? "$KWDS{$_} $opts{$_}" : '' ; } qw(where) ;
  }

sub _select_sth
  { OBB::A_ge 2, scalar @_ ;
    my $self = shift -> Reset ;
    my $name = shift ;
    my %opts = @_ ;
    my $dbh  = $self -> dbh ;
    unless ( $self -> has_tab ( $name ) )
      { $self -> Err ( "no table $name" ) ; return undef ; }
    my $SQL  = _mk_sql_select ( $name, %opts ) ;
    printf "${SQL}\n" if $self -> Debug ;
    my $sth = $dbh -> prepare ( $SQL ) ;
    $self -> Err ( "can't prepare $SQL" ) unless $sth ;
    $sth ;
  }

sub _select
  { OBB::A_ge 4, scalar @_ ;
    my $self = shift ;
    my $one  = shift ;
    my $tups = shift ;
    my $sth  = $self -> _select_sth ( @_ ) ;
    my $res  = [] ;
    if ( $sth )
      { $sth -> execute () ;
        if ( $tups eq 'tups' )
          { while ( my $row = $sth -> fetchrow_arrayref )
              { push @$res, [ @$row ] }
          }
        else
          { while ( my $hsh = $sth -> fetchrow_hashref )
              { push @$res, { %$hsh } }
          }
      }
    $one ? $res -> [ 0 ] : $res ;
  }

sub select
  { OBB::A_ge 2, scalar @_ ; shift -> _select ( 0, 'tups', @_ ) ; }
sub select_hash
  { OBB::A_ge 2, scalar @_ ; shift -> _select ( 0, 'hash', @_ ) ; }
sub select1
  { OBB::A_ge 2, scalar @_ ; shift -> _select ( 1, 'tups', @_ ) ; }
sub select1_hash
  { OBB::A_ge 2, scalar @_ ; shift -> _select ( 1, 'hash', @_ ) ; }

sub delete
  { OBB::A_ge 2, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my %opts = @_ ;
    my $dbh  = $self -> dbh ;
    unless ( $self -> has_tab ( $name ) )
      { printf "no table $name\n" ; return [] ; }
    my $SQL  = _mk_sql_delete ( $name, %opts ) ;
    printf "${SQL}\n" if $self -> Debug ;
    my $res = $dbh -> do ( $SQL ) ;
    # or Carp::confess "can't do $SQL" ;
    $res ;
  }

sub count
  { OBB::A_ge 2, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my %opts = @_ ;
    my $tups = $self -> select ( $name, %opts, cols => 'count(*) as count' ) ;
    $tups -> [ 0 ] [ 0 ] ;
  }

sub aggregates
  { OBB::A_ge 4, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my $col  = shift ;
    my @typs = @_ ;
    my $tups = $self -> select
      ( $name , cols => [ map "$_($col) as $_", @typs ] ) ;
    @typs > 1 ? @{ $tups -> [ 0 ] } : $tups -> [ 0 ] [ 0 ] ;
  }

sub MIN { aggregates ( @_, 'MIN' ) ; }
sub MAX { aggregates ( @_, 'MAX' ) ; }
sub AVG { aggregates ( @_, 'AVG' ) ; }
sub MMA { aggregates ( @_, qw(MIN MAX AVG) ) ; }

sub save_hash
  { OBB::A_in 3, 5, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my $hash = shift ;
    my $time = shift || time ;
    my @cols = () ;
    my @vals = () ;
    my $SQL  ;

    for my $atr ( sort keys %$hash )
      { my $val = $hash -> { $atr } ;
        if ( $self -> has_col ( $name, $atr ) )
          { push @cols, $atr ; push @vals, $val ; }
        else
          { printf "[warn] no $name.$atr ignore [%s]\n", ( $val || '' ) ; }
      }

    unless ( @cols ) { printf "[err] no valid cols\n" ; return 0 ; }

    my $clst = join ',', @cols ;
    my $vlst = join ',', map '?', @cols ;
    $SQL = sprintf "INSERT INTO %s ( %s ) VALUES ( %s )"
      , $name, $clst, $vlst ;
    
    printf "${SQL} ; [%s]\n", join ',', map { null $_ ; } @vals
      if $self -> Debug ;
    $SQL .= "\n" ;
    my $sth = $self -> dbh -> prepare ( $SQL ) or
      Carp::confess "can't prep $SQL" ;
    $sth -> execute ( @vals ) ? $time : 0 ;
  }

# returns ( $err [, $res ] )
sub insert_tups
  { OBB::A_is 4, scalar @_ ;
    my $self = shift ;
    my $name = shift ;
    my $nams = shift ;
    my $tups = shift ;
    my @cols = () ;
    my @idxs = () ;
    my $err ;
    my $res ;

    my $clst = join ',', @$nams ;
    my $vlst = join ',', map '?', @$nams ;
    my $SQL = sprintf "INSERT INTO %s ( %s ) VALUES ( %s )\n"
      , $name, $clst, $vlst ;

    print $SQL if $self -> Debug ;
    my $dbh = $self -> dbh ;
    my $sth ;
    my $cnt = 0 ;
    $dbh -> begin_work
      or return ( $dbh -> errstr || "can't begin" ) ;
    $sth = $dbh -> prepare ( $SQL )
      or return ( $dbh -> errstr || "can't prepare $SQL" ) ;
    for my $tup ( @$tups )
      { printf "WITH [%s]\n", join ',', @$tup if $self -> Debug ;
        unless ( $sth -> execute ( @$tup ) )
          { $sth -> finish ; return "bad tup [@$tup]" ; }
        $cnt ++ ;
      }
    $dbh -> commit or return ( $dbh -> errstr || "can't commit" ) ;
    ( undef, $cnt ) ;
  }

##############################################################
package Buff ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw() ) ;

use constant BUF_SIZE => 65536 ;

sub new  { my $self = shift ; bless { b => '' }, $self ; }
sub Init { my $self = shift ; my $s = shift ; $self->{b} = $s ; $self ; }
sub Make { my $self = shift ; $self -> new -> Init ( @_ ) ; }
sub get  { my $self = shift ; $self->{b} ; }
sub set  { my $self = shift ; my $s = shift ; $self->{b} = $s ; }
sub add  { my $self = shift ; my $s = shift ; $self->{b} .= $s ; $self ; }
sub length { my $self = shift ; length $self->{b} ; }
sub index
  { my $self = shift ;
    my $str = shift ;
    my $off = shift || 0 ;
    index $self->{b}, $str, $off ;
  }
sub substr
  { my $argc = scalar @_ ;
    my $self = shift ;
    my $off  = shift ;
    my $len  = shift ;
    my $rpl  = shift ;
    if ( $argc == 4 )
      { CORE::substr ( $self->{b}, $off, $len, $rpl ) ; }
    elsif ( $argc == 3 )
      { CORE::substr ( $self->{b}, $off, $len ) ; }
    elsif ( $argc == 2 )
      { CORE::substr ( $self->{b}, $off ) ; }
  }
sub del
  { my $self = shift ;
    my $len  = shift ;
    $self->{b} = CORE::substr ( $self->{b}, $len ) ;
  }
sub diag
  { my $self = shift ; sprintf "$self\n  %s", Util::diag ( $self -> get ) ; }

# Buff
sub STATE
  { my $self = shift ;
    my $tag  = shift ;
    sprintf "  $tag [%s] len [%s]", $self , $self -> length ;
  }

sub sysread
  { my $self = shift ;
    my $inp  = shift ;
    sysread $inp, $self->{b}, BUF_SIZE, CORE::length ( $self->{b} ) ;
  }

sub too_big { my $self = shift ; $self -> length > 10 * BUF_SIZE ; }

##############################################################
package Qdb::Thread ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(tid inp out app pool ltim vars DONE) ) ;
__PACKAGE__ -> mk_get    ( qw(binp bout) ) ;

sub Init
  { my $self = shift ;
    $self -> { inp  } = shift ;
    $self -> { out  } = shift ;
    $self -> { binp } = Buff -> Make ( '' ) ;
    $self -> { bout } = Buff -> Make ( '' ) ;
    $self -> { base } = undef ;
    $self -> { ltim } = time ;
    $self -> { vars } = {} ;
    $self -> { DONE } = 0 ;
    $self ;
  }

sub ready_to_do_lines    
  { my $self = shift ;
    my $idx  = $self -> binp -> index ( "\n" ) ;
    my $res  = 0 ;
    if ( $self -> can ( 'timeout' ) and $self -> timeout )
      { $res = 1 ;  
        OBB::TT 'ready timed out %s timeout %s', $self, $self -> timeout ;
      }
    elsif ( $self -> DONE )
      { OBB::TT "DONE (ready_to_do) %s bout %s", $self, $self -> bout ;
        $res = ! $self -> bout -> length ;
      }
#   elsif ( $self -> binp == $self -> bout )
#     { $res = 0 ; } 
#   elsif ( $idx != -1 )
#     { $res = 1 ;
#       OBB::TT 'ready have inp  %s %s', $self, $idx ;
#     }
    $res ;
  }

sub done
  { my $self = shift ;
    my $res  ;
    if ( $self -> can ( 'timeout' ) and $self -> timeout )
      { OBB::TT "timeout ; %s is done", $self ;
        $res = 1 ;
      }
    elsif ( $self -> DONE )
      { OBB::TT "DONE (done) %s bout %s", $self, $self -> bout ;
        $res = ! $self -> bout -> length ;
        OBB::TT "DONE (done) res %s", $res ;
      }
    else
      { my $len = $self -> binp -> sysread ( $self -> inp ) ;
        OBB::TT "read %s %s", $self, $self -> binp ;
        if ( $len ) { $self -> do_lines ; $self -> ltim ( time ) ; }
        $res = ! $len ;
      }
    $self -> Xit ( 'bad res for done()' ) unless defined $res ;
    $res ;
  }

# by default, copy to bout
sub do_line
  { my $self = shift ;
    my $line = shift ;
    $line ;
  }

sub do_lines
  { my $self = shift ;
    my $binp = $self -> binp ;
    my $bout = $self -> bout ;
    if ( $self -> binp -> length and $binp != $bout )
      { my $pos = 0 ;
        for ( my $idx = $binp -> index ( "\n", $pos )
            ; $idx != -1
            ; $idx = $binp -> index ( "\n", $pos )
            )
          { $bout -> add
              ( $self -> do_line ( $binp -> substr ( $pos, $idx + 1 - $pos ) )
              ) ;
            $pos = $idx + 1 ;
          }
        $binp -> del ( $pos ) ;
      }
  }

sub flush
  { my $self = shift ;
    my $tout = shift || 0 ;
    my $out  = $self -> out ;
    my $str  = $self -> bout -> get ;
    my $err  = undef ;
    my $res  = undef ;
    unless ( $out )
      { print "Qdb : no out\n" if $self -> Terse ;
        return undef ;
      }
    my $time = time ;

    if ( $tout )
      { eval
          { alarm $tout ;
            $res = syswrite $out, $str ;
            $err = $! unless defined $res ;
            alarm 0 ;
          } ;
        alarm 0 ;
        print "flush : eval[$@]\n" if $@ and $self -> Terse ;
      }
    else
      { $res = syswrite $out, $str ;
        $err = $! unless defined $res ;
      }

    if ( defined $res )
      { $self -> bout -> del ( $res ) ; }
    else
      { my $ival = time - $time ;
        printf
          ( "flush: syswrite returned undef (%s) tout[%s] sec[%s]\n"
          , $err, $tout, $ival
          ) if $self -> Terse ;
      }
    $res ;
  }

sub stop
  { my $self = shift ;
    for my $h ( $self -> inp, $self -> out )
      { next unless ref ( $h ) ; 
        $h -> flush if $h -> can ( 'flush' ) ;
      }
    $self ;
  }

sub var
  { OBB::A_in 2, 3, scalar @_ ;
    my $self = shift ;
    my $var  = shift ;
    if ( @_ )
      { my $val = shift ;
        OBB::TT ( 'NEW VAR %s', $var ) unless exists $self -> vars -> { $var } ;
        OBB::TT ( 'SET VAR %s %s', $var, $val ) ;
        $self -> vars -> { $var } = $val ;
        OBB::TT ( 'GET VAR %s %s', $var, $self -> vars -> { $var } ) ;
      }
    $self -> vars -> { $var } ;
  }

sub Dump
  { my $self = shift ;
    if ( $self -> Debug )
      { my $binp = $self -> binp -> get ; $binp =~ s/\n/\\n/g ;
        my $bout = $self -> bout -> get ; $bout =~ s/\n/\\n/g ;
        printf " self : %s %d\n", $self, $self -> tid ;
        printf "   inp [$self->{inp}]\n" ;
        printf "   out [$self->{out}]\n" ;
        printf "   binp len %2d %s\n", $self -> binp -> length, $self -> binp ;
        printf "   [%s]\n", $binp if $self -> binp -> length ;
        printf "   bout len %2d %s\n", $self -> bout -> length, $self -> bout ;
        printf "   [%s]\n", $bout if $self -> bout -> length ;
      }
  }

sub diag { my $self = shift ; sprintf "%2d", $self -> tid ; }

# Qdb::Thread
sub STATE
  { my $self = shift ;
    [ "sock $self"
    , join "\n",
        ( sprintf ( "self %s" , $self )
        , sprintf ( "  inp [%s]" , $self-> inp )
        , sprintf ( "  out [%s]" , $self-> out )
        , sprintf ( "  app [%s]" , $self-> app )
        , sprintf ( "  binp [%d]", $self -> binp -> length )
        , sprintf ( "  bout [%d]", $self -> bout -> length )
        )
    ] ;
  }

sub state_buffs
  { my $self = shift ;
    my $res = [] ;
    for my $tag ( qw(binp bout) )
      { my $buff = $self -> { $tag } ;
        push @$res, $buff ? $buff -> STATE ( $tag ) : "no $tag"  ;
      }
    $res ;
  }

##############################################################
package Qdb::Thread::Service ;
use base 'Qdb::Thread' ;
__PACKAGE__ -> mk_getset ( qw(sock port allower) ) ;
OBB -> import ; Util -> import ;

sub Init
  { my $self = shift ;
    my %opts = ( port => undef, @_ ) ;
    my $port = $opts { port } ;
    my $sock = new IO::Socket::INET
      ( Listen => 128 , LocalPort => $port, ReuseAddr => 1 ) ;
    $self -> sock ( $sock ) ;
    $self -> port ( $port ) ;
    $self -> allower ( $opts { allower } ) ;
    $self -> Die ( "Could not create socket for $port ($!)" ) unless $sock ;
    $self -> Qdb::Thread::Init ( $sock, $sock ) ;
  }

sub done
  { my $self = shift ;
    my $base = shift ;
    my $sock = $self -> sock -> accept () ;
    my $peer = $sock -> peerhost () ;
    my $port = $sock -> peerport () ;
    if ( $self -> allower -> is_allowed ( $peer ) )
      { my $thrd = Qdb::Thread::Recv -> Make ( $sock ) ;
        $base -> Add ( $thrd, $self -> app ) ;
        Util::logt ( "open  <- $peer:$self->{port} remote $port" )
          if $self -> Terse ;
      }
    else
      { close $sock ;
        my $mesg = "peer $peer not allowed on port $self->{port}" ;
        Util::logt ( $mesg ) if $self -> Terse ;
      }
    0 ;
  }

# Qdb::Thread::Service
sub STATE
  { my $self = shift ;
    [ "service $self", sprintf "listening on port %s as a %s"
    , $self -> port
    , ref ( $self -> app )
    ] ;
  }

##############################################################
package Qdb::Thread::Recv ;
use base 'Qdb::Thread' ;
__PACKAGE__ -> mk_getset ( qw() ) ;
OBB -> import ;

sub Init
  { my $self = shift ;
    my $sock = shift ;
    $self -> Qdb::Thread::Init ( $sock, $sock ) ;
    $self -> vars
      ( { pretty => 0
        , chal => undef
        , auth => undef
        , Qdb::CNF_PUSH => []
        }
      ) ;
#   printf "vars %s\n", join ' ', sort keys %{ $self -> vars } ; exit ;
    $self ;
  }

sub help
  { my $max = Qdb::MAXQ ;
    return <<HELP ;
commands [in upper- or lower-cast] :
-- PING
-- QUIT
-- STATE
-- PRETTY [1] # sets PRETTY-mode
-- PRETTY 0   # re-sets PRETTY-mode
-- PRETTY?    # are we pretty ? 
-- HELP
-- Q query
-- AUTH1
-- AUTH2 \$sum
-- AUTH?
-- PUSH new|delete \$path [\$time]
-- PUSH?
-- COMMIT [ new|delete \$path [\$time] ]
where
-- By default, each command returns a (json) hash like
     { cmd => .., err => .., res => ... }
   If \$err is not empty, \$res is undefined.
   In PRETTY-mode, the result is human-readable, and not specified.
-- In response to AUTH1, the server sends some random challenge ;
   the client responds with : AUTH2 checksum("\$challenge \$secret\\n").
   The \$secret is shared between client and server.
-- PUSH stores a new event in a (temp) event-list ; default \$time is now.
   disconnects if the session is not AUTH'ed.
-- COMMIT adds the PUSHed events to the database, and empties the PUSH list ;
   disconnects if the session is not AUTH'ed.

queries :
-- first_id
-- last_id
-- from \$id [ limit \$n ]
-- path \$path
-- stat \$path
-- lstat \$path
-- readlink \$path
where
-- \$id, \$n are integers ; \$path is a path in CPAN.
   eg, "path authors/id/C/CA/CADE/Data-Tools-1.02.tar.gz".
-- Query 'from ...' returns a list of (at most $max) events.
-- Query 'path \$path' returns the last event regarding \$path.
-- Query 'stat \$path' returns "[stat /path/to/cpan/\$path]" ;
   'lstat' likewise.
-- Query 'readlink' returns "readlink /path/to/cpan/\$path".
HELP
  }

# Qdb::Thread::Recv
sub do_line
  { my $self = shift ;
    my $line = shift ;
    my $peer = $self -> inp -> peerhost ;
    chomp $line ;
    my ( $cmd, $arg ) = split ' ', $line, 2 ; $cmd = uc $cmd ;
    $cmd = '' unless defined $cmd ;
    $arg = '' unless defined $arg ;
    my $res ;
    my $err ;
    my $txt = $line ; $txt =~ s/STOP .{10}/STOP **********/ ;
    printf "%-15s : %s\n", $peer, $txt if $self -> Terse ;
    unless ( $cmd )
      { $res = '' ; }
    elsif ( $cmd !~ /^\w+(\??)$/ ) # WORD or WORD?
      { $err = "bad command [$cmd]" ; }
    elsif ( $cmd eq 'QUIT' )
      { $self -> DONE ( 1 ) ; $res = 'QUIT' ; }
    elsif ( $cmd eq 'PING' )
      { my $ver = Qdb -> Version ;
        $res = "PONG from $Util::hostname $ver" ;
      }
    elsif ( $cmd eq 'PRETTY?' )
      { $res = sprintf "PRETTY %s", ( $self -> var ( 'pretty' ) ? 1 : 0 ) ; }
    elsif ( $cmd eq 'PRETTY' )
      { $arg = 1 if $arg eq '' ;
        $self -> var ( 'pretty', ( $arg ? 1 : 0 ) ) ;
        $res = sprintf "PRETTY %s", ( $self -> var ( 'pretty' ) ? 1 : 0 ) ;
      }
    elsif ( $cmd eq 'STATE' )
      { my $logf = $self -> app -> conf -> log_file ;
        my $llvl = $self -> app -> conf -> loglevel ;
        my $hist = $self -> app -> hist -> file ;
        $res = ''
          . ( sprintf "-- version  %s\n", Qdb -> Version )
          . ( sprintf "-- logfile  %s\n", $logf )
          . ( sprintf "-- loglevel %s\n", $llvl )
          . ( sprintf "-- database %s\n", $hist )
          . $self -> pool -> STATE
          . "\n"
          . $self -> app  -> STATE
          ;
      }
    elsif ( $cmd eq 'HELP' )
      { $res = help ; }
    else
      { ( $err, $res ) = $self -> app -> command ( $cmd, $arg, $self ) ;
        $self -> STOP if $cmd eq 'STOP' and $res eq 'STOPPED' ;
      }
    my $pret = $self -> var ( 'pretty' ) ; 
    ( $pret
    ? ( $res
      ? ( ref ( $res ) ? Util::pretty $res : $res )
      : ( "error : $err" || 'no err or res ??' )
      )
    : Server::Result -> Make
        ( cmd => $cmd
        , err => ( $err ? $err : undef )
        , res => ( $err ? undef : $res )
        ) -> as_text
    ) . "\n" ;
  }

sub STOP
  { my $self = shift ;
    $self -> bout -> add ( "STOPPED\n" ) ;
    print "stopping ; self flush\n" if $self -> Debug ;
    $self -> flush ;
    print "stopping ; self shutdown\n" if $self -> Debug ;
    $self -> out -> shutdown ( 2 ) ; # done using
    print "stopping ; pool stop\n" if $self -> Debug ;
    $self -> pool -> stop ;
    print "stopping ; base stop done\n" if $self -> Debug ;
    Qdb::Daemon -> ulock ;
    print "stopping ; unlocked\n" if $self -> Debug ;
    $Util::STOP = 'clean' ;
    exit ;
  }

# Qdb::Thread::Recv
sub STATE
  { my $self = shift ;
    my $sock = $self -> inp ;
    [ "serving $self"
    , sprintf "%s is processing a command-session << %s port %s"
        , ref ( $self -> app )
        , $sock -> peerhost, $sock -> sockport
    ] ;
  }

###################################################################
package Server::Result ;
use base 'OBB' ;
OBB -> import ; Util -> import ;
__PACKAGE__ -> mk_getset ( qw(cmd err res) ) ;

sub from_text
  { my $self = shift ;
    my $text = shift ;
    $self -> Make ( %{ Util::as_pvar $text } ) ;
  }

sub as_text
  { my $self = shift ;
    Util::as_text { map { ( $_ => $self -> $_ ) } qw(cmd err res) } ;
  }

###################################################################
package Threads ;
use base 'OBB' ;

OBB -> import ; Util -> import ;
__PACKAGE__ -> mk_getset ( qw(conf list inps outs) ) ;
sub Defs { ( list => {}, inps => {}, outs => {} ) ; }

our $TID = 0 ;

sub _close
  { my $sock = shift ;
    my $tag  = shift ;
    OBB::TT "_close $tag %s", $sock ;
    if ( ref ( $sock ) =~ /Socket/ )
      { my $peer = $sock -> peerhost ;
        my $pprt = $sock -> peerport ;
        my $port = $sock -> sockport ;
        OBB::TT '  peer %s port %s'
          , $peer || 'no_peer' , $port || 'no_port'
            if $peer or $port ;
        logv ( "close $tag %s:%s\n", $peer, $port )
          if $peer and $port ;
      }
    $sock -> close if ref ( $sock ) and $sock -> can ( 'close' ) ;
  }

sub stop
  { my $self = shift ;
    for my $thrd ( values %{ $self -> list } )
      { $self -> Del ( $thrd -> stop ) ; }
  }

sub Add
  { OBB::A_is ( 3, scalar @_ ) ;
    my $self = shift ;
    my $thrd = shift ;
    my $app  = shift ;
    my $out  = $thrd -> out ;
    my $inp  = $thrd -> inp ;
    my $sam  = ( $inp and $inp == $out ) ? 'same' : $out ;
    my $tid  = $thrd -> tid  ( $TID ++ ) ;
    logd ( "Add %s %s\n", ref $thrd, $tid ) ;
    logd ( "  inp [%s]\n", $inp ) if $inp ;
    logd ( "  out [%s]\n", $sam ) if $out ;
    $thrd -> pool ( $self ) ;
    $thrd -> app  ( $app  ) ;
    $self -> list -> { $thrd } = $thrd ;
    $thrd ;
  }

sub Del
  { my $self = shift ;
    my $thrd = shift ;
    logd ( "Del %s %d\n", ref $thrd, $thrd -> tid ) ;
    my $out = $thrd -> out ;
    my $inp = $thrd -> inp ;
    _close $inp, '<-' ;
    _close $out, '->' ;
    delete ( $self -> list -> { $thrd } ) ;
  }

sub any_readers
  { my $self = shift ;
    my $list = $self -> list ;
    my $res = new IO::Select ;
    $self -> {inps} = {} ;
    for my $key ( %$list )
      { my $thrd = $list -> { $key } ;
        my $inp  = $thrd -> inp ;
        next unless $inp and ref ( $inp ) =~ /IO::/ ;
        $res -> add ( $inp ) ;
        $self -> {inps} { $inp } = $thrd ;
      }
    $res ;
  }

sub any_writers
  { my $self = shift ;
    my $list = $self -> list ;
    my $res = new IO::Select ;
    $self -> {outs} = {} ;
    for my $key ( %$list )
      { my $thrd = $list -> { $key } ;
        my $out  = $thrd -> out ;
        next unless $thrd -> bout -> length ;
        next if $thrd -> bout -> index ( "\n" ) == -1 ;
        next unless $out and ref ( $out ) =~ /IO::/ ;
        $res -> add ( $out ) ;
        $self -> {outs} { $out } = $thrd ;
      }
    $res ;
  }

sub any_inactives
  { my $self = shift ;
    my $maxi = $self -> conf -> max_idle ;
    grep { ref $_ eq 'Qdb::Thread::Recv' and time - $_ -> ltim > $maxi }
      values %{ $self -> list } ;
  }

our %BAD_TAG = ( IDLE => 'IDLE', BINP => 'LINE TO LONG' ) ;
sub BAD_TAG { my $tag = shift ; $BAD_TAG { $tag } || $tag ; }
sub any_bads
  { my $self = shift ;
    my $maxi = $self -> conf -> max_idle ;
    my $res  = { IDLE => [], BINP => [] } ;
    for my $t ( values %{ $self -> list } )
      { next unless ref $t eq 'Qdb::Thread::Recv' ;
        if ( time - $t -> ltim > $maxi )
          { push @{ $res -> { IDLE } }, $t ; }
        elsif ( $t -> binp and $t -> binp -> too_big )
          { push @{ $res -> { BINP } }, $t ; }
      }
    $res ;
  }

sub by_inp { $_[0] -> {inps} { $_[1] } ; }
sub by_out { $_[0] -> {outs} { $_[1] } ; }

sub do_a_loop
  { my $self = shift ;

    OBB::TT 'do_a_loop ...' ;

    for my $h ( $self -> any_readers -> can_read ( 3 ) )
      { my $thrd = $self -> by_inp ( $h ) ;
        OBB::TT sprintf 'reader %s', $thrd ;
        $self -> Del ( $thrd ) if $thrd -> done ( $self ) ;
      }

    for my $h ( $self -> any_writers -> can_write () )
      { my $thrd = $self -> by_out ( $h ) ;
        OBB::TT 'flush 2 %s %s', $thrd, $thrd -> bout ;
        $thrd -> flush ;
        if ( $thrd -> DONE and not $thrd -> bout -> length )
          { OBB::TT 'writer %s is done', $thrd ;
            $self -> Del ( $thrd ) ;
          }
      }

    my $bads = $self -> any_bads ;
    for my $tag ( sort keys %$bads )
      { my $lst = $bads -> { $tag } ;
        my $bad = BAD_TAG $tag ;
        my $msg = "Connection reset by peer ($bad)\n" ;
        for my $thread ( @$lst )
          { OBB::TT "delete bad thread %s %s", $thread, $bad ;
            logt ( "delete bad thread %s %s", $thread, $bad ) ;
            $thread -> bout -> set ( $msg ) ;
            $thread -> flush ( 1 ) ;
            $self -> Del ( $thread ) ;
          }
      }
  }

sub by_tid { $a -> tid <=> $b -> tid } ;

sub Dump
  { my $self = shift ;
    for my $thrd ( sort by_tid values %{ $self -> {list} } )
      { $thrd -> Dump ; }
  }

# Threads
sub STATE
  { my $self = shift ;
    my @list = () ;
    for my $tuple
      ( sort { $a -> [0] cmp $b -> [0] }
          map { $_ -> STATE } sort by_tid values %{ $self -> {list} }
      )
      { my ( $tag, @itms ) = @$tuple ; push @list, @itms ; }
    "threads :\n%s\n", join "\n", map { "-- $_" ; } @list ;
  }

###################################################################
package Qdb::Hist ;
use base 'TS' ;
__PACKAGE__ -> mk_getset ( qw() ) ;

sub Init
  { my $self = shift ;
    $self -> TS::Init ( @_ ) ;
    $self -> connect ;
    return undef unless $self -> dbh ;
    OBB::TT 'db connected %s file %s', !! $self -> dbh, $self -> file ;
    OBB::TT 'make_db done' ;
    $self ;
  }

sub get_meta
  { my $self = shift ;
    my $key  = shift ;
    my $sel  = $self -> select1
      ( Qdb::TAB_META , cols => 'val' , where => "key = '$key'" ) ;
    $sel -> [ 0 ] ;
  }

sub set_meta
  { my $self = shift ;
    my $key  = shift ;
    my $val  = shift ;
    $self -> insert_tups
      ( Qdb::TAB_META, [ qw(key val) ], [ [ $key, $val ] ] ) ;
  }

sub insert_events
  { my $self = shift ;
    my $list = shift ;
    $self -> insert_tups ( Qdb::TAB_EVTS, [ qw(type path time) ], $list ) ;
  }

###################################################################
package App ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(conf) ) ;

sub Init { my $self = shift ; $self -> OBB::Init ( @_ ) ; $self ; }

sub check_conf
  { my $self = shift ;
    my $conf = $self -> conf ;
    my $errs = $conf -> errs ; 
    my $llvl = $conf -> loglevel ;
    $errs -> err ( "Conf: bad loglevel [$llvl]" ) unless $OBB::VLVLS{$llvl} ;
    { $conf -> { cpan } =~ s!/+$/!! ;
      my $cpan = $conf -> { cpan } ;
      $errs -> err ( "can't find home CPAN (conf cpan => $cpan)" )
        unless -d $cpan ;
    }
    $errs ;
  }

###################################################################
package Server ;
use base qw(App) ;
__PACKAGE__ -> mk_getset ( qw(threads service hist) ) ;
OBB -> import ; Util -> import ;

use constant
  { PROG => 'qdb-server'
  , STOPPED  => 'STOPPED'
  , TAB_EVTS => Qdb::TAB_EVTS
  } ;

our $PATH_PAT = qr{[-./\w]+} ;

sub Init
  { my $self = shift ;
    my %opts = ( @_ ) ;
    OBB::TT 'Server Init %s', $self ;
    $self -> App::Init ( %opts ) ;
    my $conf = $self -> conf ;
    my $allower = Qdb::Allower -> Make
      ( spec => $conf -> permit, errs => $conf -> errs ) ;
    $self -> check_conf -> Xit_on_errs ;
    my $thrd = Qdb::Thread::Service -> Make
      ( port => $self -> port, allower => $allower ) ;
    $self -> service ( $self -> threads -> Add ( $thrd, $self ) ) ;
    my $hist = Qdb::Hist -> Make ( file => $conf -> dbs_file ) ;
    $self -> hist ( $hist ) ;
    OBB::TT 'Server Init %s', 'done' ;
    $self ;
  }

sub port { my $self = shift ; $self -> conf -> port ; }

sub check_conf
  { my $self = shift ;
    my $errs = $self -> App::check_conf ;
    my $conf = $self -> conf ;
    unless ( my $secr = $conf -> secret )
      { $errs -> err ( 'no secret' ) ; }
    elsif ( length $secr < 8 )
      { $errs -> wrn ( "secret too short" ) ; }
    unless ( @{ $conf -> permit } )
      { $errs -> err ( "no config for 'permit'" ) ; }
    # only server rotates
      { my $rotl = $conf -> rotate ;
        my ( $num, $spc ) = split ' ', $rotl ;
        unless ( defined $spc )
          { $errs -> err ( "Conf: bad rotate [$rotl]" ) ; }
        else
          { $errs -> err ( "Conf: bad number in rotate [$rotl]" )
              unless $num =~ /^\d+$/ ;
            $errs -> err ( "Conf: bad spec in rotate [$rotl]" )
              unless defined Util::secs4spec ( $spc ) ;
          }
      }
    for my $opt ( qw(ival_next_rrr max_idle) )
      { my $spec = $conf -> $opt ;
        my $secs = Util::secs4spec ( $spec ) ;
        unless ( defined $secs )
          { $errs -> err ( "Conf: bad spec [$spec] for option $opt" ) ; }
        else
          { $conf -> $opt ( $secs ) ; }
      }
    $errs ;
  }

# Server allow_command
sub allow_command
  { my $self = shift ;
    my $cmd  = shift ;
    my $thrd = shift ;
    my $peer = $thrd -> inp -> peerhost ;
    my $res  = 1 ;
    # commands only allowed from localhost
    my @COMMANDS = qw(STOP) ;
    if ( grep { $cmd eq $_ } @COMMANDS )
      { my $ips = Util::localhost_ips () ;
        $res = $ips ? grep { $peer eq $_ } @$ips : 0 ;
      }
    $res ;
  }

# Server commands ; must return ( $err, $res )
sub command
  { my $self = shift ;
    my $cmd  = shift ;
    my $arg  = shift ;
    my $thrd = shift ;
    my $peer = $thrd -> inp -> peerhost ;
    my $err  ;
    my $res  ;
    if ( ! $self -> allow_command ( $cmd, $thrd ) )
      { $err = "command not allowed [$cmd] from $peer" ; }
    elsif ( $cmd eq 'STOP' )
      { my $STPR = $self -> conf -> read_stp || 'no secret' ;
        if ( $arg eq $STPR )
          { $res = Server::STOPPED ; }
        else
          { $err = 'BAD SECRET' ; }
      }
    elsif ( $cmd eq 'Q' )
      { ( $err, $res ) = $self -> do_hist ( $cmd, $arg, $thrd ) ; }
    elsif ( $cmd =~ /^AUTH([12\?])$/ )
      { ( $err, $res ) = $self -> do_AUTH ( $cmd, $arg, $thrd ) ; }
    elsif ( $cmd =~ /^PUSH(\??)$/ )
      { ( $err, $res ) = $self -> do_PUSH ( $cmd, $arg, $thrd, $1 ) ; }
    elsif ( $cmd eq 'COMMIT' )
      { ( $err, $res ) = $self -> do_COMM ( $cmd, $arg, $thrd ) ; }
    else
      { $err = 'unknown command' ; }
    ( $err, $res ) ;
  }

sub _maptup  { my $tup = shift ; join ' ', @$tup ; }
sub _maptups { my $lst = shift ; [ map { _maptup $_ ; } @$lst ] ; }

sub get_db
  { my $hist = shift ;
    my $cpan = shift ;
    my $qwe  = shift ;
    my $res = '' ;
    my $err = '' ;
    my $LIM = 1000 ;
    $qwe =~ s/^\s+// ;
    $qwe =~ s/\s+$// ;
    unless ( $hist )
      { $err = 'no hist' ; }
    elsif ( $hist -> dbh -> state )
      { $err = 'not connected' ; }
    elsif ( ! $qwe )
      { $err = 'no query' ; }
    elsif ( $qwe eq 'first_id' )
      { $res = $hist -> MIN ( TAB_EVTS, 'id' ) ; }
    elsif ( $qwe eq 'last_id' )
      { $res = $hist -> MAX ( TAB_EVTS, 'id' ) ; }
    elsif ( $qwe =~ /^from\s+(\d+)$/ )
      { $res = _maptups $hist -> select
          ( TAB_EVTS
          , cols     => 'id, type, path'
          , where    => "id >= $1"
          , order_by => 'id'
          , limit    => $LIM
          ) ;
      }
    elsif ( $qwe =~ /^from\s+(\d+)\s+limit\s+(\d+)$/ )
      { my $lim = $2 ;
        $res = _maptups $hist -> select
          ( TAB_EVTS
          , cols     => 'id, type, path'
          , where    => "id >= $1"
          , order_by => 'id'
          , limit    => ( $lim < $LIM ? $lim : $LIM )
          ) ;
      }
    elsif ( $qwe =~ /^path\s+($PATH_PAT)$/ )
      { my $path = $1 ;
        $res = _maptup $hist -> select1
          ( TAB_EVTS
          , cols     => 'id, type, path'
          , where    => "path = '$path'"
          ) ;
      }
    elsif ( $qwe =~ /^(stat|lstat|readlink)\s+($PATH_PAT)$/ )
      { my $what = $1 ;
        my $path = $2 ;
        if ( grep $_ eq '..', split m!/!, $path )
          { $err = "bad path ; contains '..'" ; }
        elsif ( $what eq 'readlink' )
          { $res =   eval "$what '$cpan/$path'"   ; }
        else
          { $res = [ eval "$what '$cpan/$path'" ] ; }
      }
    else
      { $err = "bad query [$qwe]" ; }
    { err => $err , res => $res } ;
  }

sub do_hist
  { my $self = shift ;
    my $cmd  = shift ;
    my $arg  = shift ;
    my $thrd = shift ;
    my $hist = $self -> hist ;
    my $cpan = $self -> conf -> cpan ;
    my $qwe = $arg ;
    OBB::TT ( '*****************************' ) ;
    OBB::TT ( 'cmd %s arg %s qwe %s', $cmd, $arg, $qwe ) ;
    my $r = get_db $hist, $cpan, $qwe ;
    my $err = $r -> {err} ;
    my $res = $err ? undef : $r -> {res} ;
    ( $err, $res ) ;
  }

sub make_challenge { Util::mk_sum ( sprintf '%016d', int rand 10**16 ) ; }

sub do_AUTH
  { my $self = shift ;
    my $cmd  = shift ;
    my $arg  = shift ;
    my $thrd = shift ;
    my $err  ;
    my $res  ;
    OBB::TT ( 'cmd %s arg %s', $cmd, $arg ) ;
    if ( $cmd eq 'AUTH1' )
      { $res = $thrd -> var ( 'chal', make_challenge ) ; }
    elsif ( $cmd eq 'AUTH2' )
      { my $secr = $self -> conf -> secret ;
        my $chal = $thrd -> var ( 'chal' ) ;
        if ( ! defined $chal )
          { $err = 'AUTH2 before AUTH1' ; }
        elsif ( ! $arg )
          { $err = 'AUTH2 : no arg' ; }
        else
          { $res = $arg eq Util::mk_sum ( "$chal $secr\n" ) ;
            $thrd -> var ( 'auth', $res ) ;
          }
      }
    else # $cmd eq 'AUTH?'
      { $res = $thrd -> var ( 'auth' ) ; }
    $res ||= 0 ; # avoid $res eq '' or undefined
    ( $err, $res ) ;
  }

sub do_PUSH
  { my $self = shift ;
    my $cmd  = shift ;
    my $arg  = shift ;
    my $thrd = shift ;
    my $ques = shift || '' ;
    my $pskp = shift || '' ; # path test skip
    my $list = $thrd -> var ( Qdb::CNF_PUSH ) ;
    my $err  ;
    my $res  ;
    my @args = split ' ', $arg ;
    push @args, time if @args < 3 ;
    if ( ! $thrd -> var ( 'auth' ) )
      { $err = 'not authorised for push' ;
        $thrd -> pool -> Del ( $thrd ) ;
      }
    elsif ( $ques )
      { $res = @$list ; }
    elsif ( @args != 3 )
      { $err = "bad $cmd args" ; }
    else
      { my $cpan = $self -> conf -> cpan ;
        my $type = $args [ 0 ] ;
        my $path = $args [ 1 ] ;
        my $file = "$cpan/$path" ;
        if ( $type !~ /^(new|delete)$/ )
          { $err = "bad type [$type]" ; }
        elsif ( $path !~ /^($PATH_PAT)$/
             or grep $_ eq '..', split m!/!, $path
              ) 
          { $err = "bad path ; contains '..'" ; }
        else
          { if ( not $pskp )
              { if ( $type eq 'new' and ! -e $file )
                  { $err = "new not in \$cpan : $file" ; }
                elsif ( $type eq 'delete' and -e $file )
                  { $err = "del but in \$cpan : $file" ; }
              }
            unless ( $err )
              { push @$list, [ @args ] ;
                $res = @$list ;
              }
          }
      }
    $res ||= 0 ; # avoid $res eq '' or undefined
    ( $err, $res ) ;
  }

sub do_COMM
  { my $self = shift ;
    my $cmd  = shift ;
    my $arg  = shift ;
    my $thrd = shift ;
    my $err  ;
    my $res  ;
    OBB::TT ( 'cmd %s arg %s', $cmd, $arg ) ;
    my @args = split ' ', $arg ;
    push @args, time if @args == 2 ;

    if ( @args == 3 )
      { my ( $type, $path, $time ) = @args ;
        my ( $e, $r ) = $self -> do_PUSH
          ( 'PUSH', "$type $path $time", $thrd, '', 1 ) ;
        $err = $e if $e ;
      }
    return ( $err, undef ) if $err ;

    if ( ! $thrd -> var ( 'auth' ) )
      { $err = 'not authorised for commit' ;
        $thrd -> pool -> Del ( $thrd ) ;
      }
    elsif ( @args and @args != 3 )
      { $err = "bad $cmd args" ; }
    else
      { my $list = $thrd -> var ( Qdb::CNF_PUSH ) ;
        my $hist = $self -> hist ;
        logt ( "insert %d events ...", scalar @$list ) ;
        ( $err, $res ) = $hist -> insert_tups
          ( Qdb::TAB_EVTS, [ qw(type path time) ], $list ) ;
        logt ( "inserting done\n" ) ;
        @$list = () ;
      }
    $res ||= 0 ; # avoid $res eq '' or undefined
    ( $err, "COMMITTED $res" ) ;
  }

# Server
sub STATE
  { my $self = shift ;
    ( ( sprintf "-- hostname %s", $Util::hostname )
    ) ;
  }

###################################################################
package Fsck ;
use base qw(App) ;
__PACKAGE__ -> mk_getset ( qw() ) ;
OBB -> import ; Util -> import ;

use constant
  { PROG => 'qdb-fsck'
  } ;

sub Init
  { my $self = shift ;
    my %opts = ( @_ ) ;
    OBB::TT 'Fsck Init %s', $self ;
    $self -> App::Init ( %opts ) ;
    my $conf = $self -> conf ;
    $self -> check_conf -> Xit_on_errs ;
    OBB::TT 'Fsck Init %s', 'done' ;
    $self ;
  }

sub check_conf
  { my $self = shift ;
    my $errs = $self -> App::check_conf ;
    my $conf = $self -> conf ;
    unless ( my $secr = $conf -> secret )
      { $errs -> err ( 'no secret' ) ; }
    unless ( $conf -> server )
      { $errs -> err ( "no config for 'server'" ) ; }
    $errs ;
  }

##############################################################
package Qdb::Daemon ;
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(conf demn STOP) ) ;
use Fcntl qw(:flock) ;
OBB -> import ; Util -> import ;

our $PROG = Qdb::PROG ;
our $DEMN = 'daemon' ;
our $HOSTNAME = `hostname` ; chomp $HOSTNAME ;

sub Init
  { my $self = shift ;
    my %opts = @_ ;
    my $conf = $self -> conf ( $opts { conf } ) ;
    my $LOGF = $conf -> log_file ;
    my $demn = Proc::Daemon -> new
      ( work_dir     => '.'
      , child_STDOUT => ">>$LOGF"
      , child_STDERR => ">>$LOGF"
      , pid_file     => $conf -> pid_file
      ) ;
    $self -> demn ( $demn ) ;
    $self ;
  }

sub xlock
  { my $self = shift ;
    my $file = $self -> conf -> lck_file ;
    my $cnt  = 0 ;
    my $res  = 0 ;

    unless ( open LOCK, ">$file" )
      { syslog ( "exit ; can't write lock file [$file] ($!)" ) ; exit ; }
    while ( $cnt < 2 )
      { $cnt ++ ;
        if ( flock LOCK, LOCK_EX|LOCK_NB )
          { logd ( "got lock ; try[$cnt]" ) ;
            $res = 1 ; last ;
          }
        else
          { logd ( "can't get lock ; try[$cnt]" ) ; }
        sleep 5 ;
      }
    $res ;
  }

sub ulock { my $self = shift ; flock LOCK, LOCK_UN ; }

# pid # we're the parent
#  -1 # can't run ; other is already running or can't lock
#   0 # we're the daemon ; including opt{i}

sub start_daemon
  { my $self = shift ;
    my $opti = shift ;
    my $pid  = undef ;
    my $demn = $self -> demn ;
    my $msg  ;
    my $cnt  = 0 ;

    while ( $cnt < 3 and $pid = $demn -> Status ( undef ) )
      { $cnt ++ ; sleep 1 ; }

    if ( $pid and $pid != $$ )
      { $msg = "$PROG: some $DEMN is already running ; pid $pid" ;
        $pid = -1 ;
      }
    elsif ( ! $self -> xlock )
      { $msg = "$PROG: can't lock ; some $DEMN is already running" ;
        $pid = -1 ;
      }
    else
      { $self -> ulock ;
        $pid = ( $opti ? 0 : $demn -> Init () ) ;
        $msg = sprintf "$PROG: launched $DEMN on %s ; pid %s"
          , $HOSTNAME, $pid ;
      }
    return $pid, $msg ;
  }

sub send_stop
  { my $self = shift ;
    my $res  = 0 ;
    my $SOCK = IO::Socket::INET -> new
      ( PeerAddr => 'localhost'
      , PeerPort => $self -> conf -> port
      , Proto => 'tcp'
      ) ;
    if ( $SOCK )
      { printf $SOCK "STOP %s\n", ( $self -> conf -> read_stp || '_' ) ;
        $SOCK -> shutdown ( 1 ) ; # done writing
        my $line = <$SOCK> ;
        $res = 1 if $line and $line =~ 'STOPPED' ;
        $SOCK -> shutdown ( 2 ) ; # done using
      }
    else
      { logv ( "can't open sock ; nevermind" ) ; }
    $res ;
  }

sub send_state
  { my $self = shift ;
    my $res  = '' ;
    my $SOCK = IO::Socket::INET -> new
      ( PeerAddr => 'localhost'
      , PeerPort => $self -> conf -> port
      , Proto => 'tcp'
      ) ;
    if ( $SOCK )
      { printf $SOCK "STATE\n" ;
        $SOCK -> shutdown ( 1 ) ; # done writing
        $res = join '', <$SOCK> ;
        $SOCK -> shutdown ( 2 ) ; # done using
      }
    else
      { $res = "can't open sock to daemon" ; }
    $res ;
  }

sub send_sig
  { my $self = shift ;
    my $pid  = shift ;
    my $nam  = shift ;
    my $num  = Util::sig_num ( $nam ) ;
    ( defined $num
    ? ( ( kill $num, $pid ) ? '' : "can't $nam daemon ($pid)" )
    : "no num for signal $nam"
    ) ;
  }

sub stop_daemon
  { my $self = shift ;
    $self -> demn -> Kill_Daemon ( undef ) unless $self -> send_stop ;
  }

sub status { my $self = shift ; $self -> demn -> Status ( undef ) ; }

sub sss_exit
  { my $self = shift ;
    my $ARG  = shift ;
    my $opti = shift ;
    my $ownr = $self -> conf -> own_pid ;
    if ( $ARG and $ARG eq 'start' )
      { my ( $pid, $msg ) = $self -> start_daemon ( $opti ) ;
        if ( $pid )
          { # we are the parent
            syslog ( $msg ) ;
            my $xit = ( $pid < 0 ? 1 : 0 ) ;
            exit $xit unless $opti and $pid > 0 ;
          }
        else
          { # we have a running daemon
            OBB -> Verbosity ( $self -> conf -> loglevel ) ;
            Util::MODE ( 1 ) ;
            Util::rotate ( $self -> conf ) unless $opti ;
            logq ( "$0 start [$$] %s", Qdb -> Version ) ;
            my $sys_lock = $self -> conf -> sys_lock ;
            $self -> xlock ;
            $self -> conf -> make_stp ;
            $self -> conf -> make_sys_lock
              or syslog ( "can't write sys_lock [%s] ; nm", $sys_lock ) ;
            STDOUT -> autoflush ( 1 ) ;
            STDERR -> autoflush ( 1 ) ;
            syslog ( "daemon started" ) unless $opti ;
          }
      }

    elsif ( $ARG and $ARG eq 'state' )
      { logt ( $self -> send_state ) ; exit 0 ; }

#   elsif ( $< and ! defined $ownr )
#     { my $nam = getpwuid ( $< ) || $< ;
#       logt ( "$PROG: no owner ; $nam can't stat\n" ) ;
#       exit 1 ;
#     }

    elsif ( $< and $ownr != $< )
      { my $own = getpwuid ( $ownr ) || $ownr ;
        my $nam = getpwuid ( $< ) || $< ;
        logt ( "$PROG: owned by $own ; $nam can't stat\n" ) ;
        exit 1 ;
      }
   elsif ( $ARG and $ARG eq 'stop' )
      { my $pid = $self -> status ;
        my $msg = "$PROG: $DEMN is not running" ;
        my $xit = 0 ;
        if ( $pid )
          { my $cnt = $self -> stop_daemon ;
            $msg = sprintf "$PROG: %s $DEMN on %s ; pid %s"
              , ( $cnt ? 'stopped' : "can't stop" ), $HOSTNAME, $pid ;
            if ( $cnt )
              { $self -> conf -> rm_stp ; $self -> conf -> rm_sys_lock ; }
            $xit = ! $cnt ;
          }
        syslog ( $msg ) ;
        exit ( $xit || 0 ) ;
      }

    elsif ( $ARG and $ARG eq 'reload' )
      { my $pid = $self -> status ;
        my $msg = "$PROG: $DEMN is not running" ;
        if ( $pid ) { $msg = $self -> send_sig ( $pid, 'HUP' ) ; }
        logt ( $msg ) ;
        exit 0 ;
      }

    elsif ( $ARG and $ARG eq 'restart' )
      { my $pid = $self -> status ;
        my $msg = "$PROG: $DEMN is not running" ;
        if ( $pid ) { $msg = $self -> send_sig ( $pid, 'USR1' ) ; }
        logt ( $msg ) ;
        exit 0 ;
      }

    elsif ( my $pid = $self -> status )
      { logt ( "$PROG: $DEMN is running ; pid %s\n", $pid ) ;
        exit 0 ;
      }
    else
      { logt ( "$PROG: $DEMN is not running\n" ) ;
        exit 1 ;
      }
    $self ;
  }

##############################################################
package Qdb::Permit ;
# ad : 'allow' or 'deny'
# nm : a Net::Netmask
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(ad nm) ) ;
OBB -> import ; Util -> import ;

##############################################################
package Qdb::Allower ;
# a lst of allow/deny Netmask's ;
# a given ip is allowed if the first block that matches is 'allow'-ed
use base 'OBB' ;
__PACKAGE__ -> mk_getset ( qw(lst errs) ) ;
OBB -> import ; Util -> import ;
sub Defs { ( lst => [] ) ; }

sub Init
  { my $self = shift ;
    my %opts = @_ ;
    my $spec = $opts { spec } ;
    my $want = 'ad' ; # allow deny
    my $ad ;
    $self -> OBB::Init ( %opts ) ;
    $self -> errs ( Msgs -> Make ) unless $self -> errs ;
    $self -> err ( "empty spec for Qdb::Allower" ) unless @$spec ;
    for my $term ( @$spec )
      { last if $self -> err ;
        OBB::TT ( 'loop term %s', $term ) ;
        if ( $want eq 'ad' )
          { if ( $term =~ /^(allow|deny)$/ )
              { $ad = $term ; $want = 'ip' ; }
            else
              { $self -> err ( "expected allow/deny ; found $term" ) ; }
          }
        elsif ( $want eq 'ip' and $term =~ /^(allow|deny)$/ )
          { $self -> err ( "expected host/ip-block ; found $term" ) ; }
        else # ( ( $want eq '' and $term !~ ) or $want eq 'ip' )
          { if ( $term =~ /^(allow|deny)$/ )
              { $ad = $term ; $want = 'ip' ; }
            else
              { $want = '' ;
                my $ips = Util::get_host_ips ( $term ) ;
                if ( $ips )
                  { for my $ip ( @$ips )
                      { push @{ $self -> lst }, Qdb::Permit -> Make 
                          ( ad => $ad , nm => Net::Netmask -> new ( $ip ) ) ;
                        OBB::TT ( 'push ip %s term %s', $ip, $term ) ;
                      }
                  }
                else
                  { $self -> errs -> wrn ( "can't resolve $term" ) ; }
              }
          }
      }

    if ( ! $self -> err and $want )
      { my $exp = ( $want eq 'ad' ? "allow/deny" : "host/ip-block" ) ;
        $self -> err ( "expected $exp ; found end-of-list" ) ;
      }
    $self ;
  }

sub err
  { my $self = shift ;
    $self -> errs -> err ( shift ) if @_ ;
    $self -> errs -> errs ;
  }

sub is_allowed
  { my $self = shift ;
    my $ip   = shift ;
    my $list = $self -> lst ;
    for my $block ( @$list )
      { my $ad = $block -> ad ;
        my $nm = $block -> nm ;
        my $match =
          ( $nm -> bits == 32
          ? ( $nm -> base eq $ip ) || 0
          : $nm -> match ( $ip )
          ) ;
        OBB::TT ( 'loop ad %s nm %s match', $ad, $nm -> base, $match ) ;
        if ( $match ) { return $ad eq 'allow' ; }
      }
    0 ;
  }

1 ;
package main ;
#! /usr/bin/perl

use strict ;
use warnings ;

use lib '.' ;

use JSON::XS ;
use DBI ;
use IO::File ;
use IO::Pipe ;
use IO::Select ;
Qdb -> import ;

my $DEF_CPAN = '/var/ftp/mirror/CPAN' ;
my $DEF_LITE = "cpan.lite" ;
my $DEF_MAX  = 10 ;
my $TAB_EVTS = Qdb::TAB_EVTS ;
my $TAB_META = Qdb::TAB_META ;
my $DEF_FIND = "/tmp/FIND.cpan" ;
my $IGNR_PAT = qr{^(local/|RECENT-)} ;

my $prog = substr $0, rindex ( $0, '/' ) + 1 ;
my $Usage = <<USAGE ;
Usage: $prog [-vqdTfhirsD] [-C [1]] [-m max] [-c conf]
option v : be verbose
option q : be quiet
option d : show debug info
option T : show trace info
option f : action ; otherwise dry-run ; implies "-C 1"
option h : show help ; exit
option i : show meta data ; exit ;
option r : get events from RECENT-files ; default : /path/to/cpan
option s : send events to server instead of the database
option m : use \$max paralel finds ; default $DEF_MAX
option D : delete all events ; increment meta.scheme ; exit
option C : read/write the find-list cache $DEF_FIND
           -C   : read the cache
           -C 1 : write the cache
option c : use config \$conf ; default [ @{[join ', ', Conf::FILES]} ]
-----------------------------------------------------------------------
$prog collects new events, and updates the event-history.
-- By default, $prog uses find(1) to list all objects in \$cpan,
   and compares the list with the database, resulting in a list
   of new/delete-events.
   Alternatively, with option '-r', $prog inspects CPAN's RECENT-files,
   and selects events that are not in the database.
-- Then, by default, the collected events are inserted into the database ;
   Alternatively, with -s, they are sent to the configured server.
-----------------------------------------------------------------------
USAGE
sub Usage { die "$_[0]$Usage" ; }
sub Error { die "[error] $prog: $_[0]\n" ; }
sub Warn  { warn "[warn] $prog: $_[0]\n" ; }

# usage: &GetOptions(ARG,ARG,..) defines $opt_ID as 1 or user spec'ed value
# usage: &GetOptions(\%opt,ARG,ARG,..) defines $opt{ID} as 1 or user value
# ARG = 'ID' | 'ID=SPC' | 'ID:SPC' for no-arg, required-arg or optional-arg
# ID  = perl identifier
# SPC = i|f|s for integer, fixedpoint real or string argument

use Getopt::Long ;
Getopt::Long::config ( 'no_ignore_case' ) ;
my %opt = () ; Usage '' unless GetOptions
  ( \%opt, qw(v q d T f h i m=i D C:s c=s s r) ) ;
Usage "Arg count\n" if @ARGV > 0 ;

$opt{C} = 1 if $opt{f} ;
$opt{v} ||= $opt{d} ||= $opt{T} ;
OBB -> Verbosity ( 'Silent'  ) if $opt{s} ;
OBB -> Verbosity ( 'Quiet'   ) if $opt{q} ;
OBB -> Verbosity ( 'Verbose' ) if $opt{v} ;
OBB -> Verbosity ( 'Debug'   ) if $opt{d} ;
OBB -> Verbosity ( 'Trace'   ) if $opt{T} ;

my $conf = Conf -> Make ( file => $opt{c} ) ; # or die
$conf -> Dmp ( 'config' ) if $opt{d} ;

my $threads = Threads -> Make ( conf => $conf ) ;
my $SENDER  = Fsck -> Make ( conf => $conf, threads => $threads ) ;

my $LITE = $conf -> dbs_file ;
my $CPAN = $conf -> cpan ;
my $MAX = $opt{m} || $DEF_MAX ;
my $TAG = $opt{f} ? 'DOING' : 'WOULD' ;
my $NOX = $opt{h} || $opt{i} || $opt{f} ;

if ( $opt{h} ) { print $Usage ; exit ; }

sub logr
  { my $fmt = shift ;
    my $txt = $fmt ;
    if ( @_ ) { $txt = sprintf $fmt, @_ ; }
    chomp $txt ;
    printf "\$^T + %-4s %s\n", time - $^T, $txt ;
  }

sub logx { logr @_ if ! $opt{q} ; }
sub logv { logr @_ if   $opt{v} ; }
sub logd { logr @_ if   $opt{d} ; }
sub logT { logr @_ if   $opt{T} ; }

sub create_sql
  { <<SQL ;
CREATE TABLE IF NOT EXISTS $TAB_EVTS
  ( id   integer PRIMARY KEY NOT NULL
  , type text NOT NULL CHECK ( type = 'new' or type = 'delete' )
  , path text NOT NULL UNIQUE ON CONFLICT REPLACE
  , time int  NOT NULL DEFAULT (strftime('%s','now'))
  )
---
CREATE TABLE IF NOT EXISTS $TAB_META
  ( id  integer PRIMARY KEY NOT NULL
  , key text NOT NULL UNIQUE ON CONFLICT REPLACE
  , val NOT NULL
  , time int NOT NULL DEFAULT (strftime('%s','now'))
  )
---
INSERT or IGNORE INTO meta ( key, val ) VALUES ( 'scheme', 1 ) ;
---
INSERT or IGNORE INTO meta ( key, val ) VALUES ( 'epoch',  0 ) ;
SQL
  }

sub mk_hist
  { my $path = shift ;
    logv "connecting to %s ...\n", $path ;
    my $hist = Qdb::Hist -> Make ( file => $path )
      or Error ( "can't mk_dbh $path" ) ;
    my $dbh = $hist -> dbh ;
    $dbh -> do ( 'PRAGMA cache_size=1000000' ) ;
    unless ( $opt{s} or $opt{i} )
      { $dbh -> do ( $_ ) for split /---\n/, create_sql ; }
    $hist ;
  }

# returns [rows] or undef
sub db_get
  { my $dbh  = shift ;
    my $sql  = shift ;
    my @args = @_ ;
    my $arg0 = $args [ 0 ] ;
    my $res  = [] ;
    # make $args into a [[]]
    my $args ;
    if ( @args == 0 or not ref $arg0 )
      { $args = [ [ @args ] ] ; }
    elsif ( @$arg0 == 0 or not ref ( $arg0 -> [0] ) )
      { $args = [ @args ] ; }
    else
      { $args = $arg0 ; }
    my $sth = $dbh -> prepare ( $sql ) or return undef ;
    for my $tup ( @$args )
      { logd "$sql WITH [%s]", join ',', @$tup ;
        $sth -> execute ( @$tup ) or return undef ;
        while ( my @row = $sth -> fetchrow_array )
          { if ( @row )
              { push @$res, \@row ; }
            else
              { if ( my $err = $sth -> err ) { Error "fetch failed [$err]" ; }
                last ;
              }
          }
      }
    $res ;
  }

# returns #affected-rows || 0E0 or undef
sub db_do
  { my $dbh  = shift ;
    my $sql  = shift ;
    my @args = @_ ;
    my $arg0 = $args [ 0 ] ;
    my $args ;
    if ( @args == 0 or not ref $arg0 )
      { $args = [ [ @args ] ] ; }
    elsif ( @$arg0 == 0 or not ref ( $arg0 -> [0] ) )
      { $args = [ @args ] ; }
    else
      { $args = $arg0 ; }
    my $sth = $dbh -> prepare ( $sql ) or return undef ;
    $dbh -> begin_work or return undef ;
    for my $tup ( @$args )
      { logd "$TAG $sql WITH [%s]", join ',', @$tup ;
        if ( $opt{f} and not $sth -> execute ( @$tup ) )
          { $dbh -> rollback ; return undef ; }
      }
    $dbh -> commit ;
    $sth -> rows || '0E0' ;
  }

sub cnt_sth
  { my $sth = shift ;
    $sth -> execute ( @_ ) || Error $sth -> {Database} -> errstr ;
    my $row = $sth -> fetchrow_arrayref ;
    my $res = $row -> [ 0 ] ;
    Error "cnt_sth : undef for [@_]" unless defined $res ;
    $res ;
  }

sub _get_todo
  { my $res = shift ;
    my $dir = shift ;
    Error "_get_todo : not a directory [$dir]" unless -d $dir ;
    opendir DIR, $dir or Error "can't opendir $dir ($!)" ;
    $res -> { $_ } ++ for map "$dir/$_", grep !/^\.\.?$/, readdir DIR ;
    closedir DIR ;
    delete $res -> { $dir } ;
  }

sub get_todo
  { my $res = {} ;
    return $res if defined $opt{C} and not $opt{C} ; # cache-read
    _get_todo $res, $CPAN ;
    _get_todo $res, "$CPAN/authors" ;
    _get_todo $res, "$CPAN/authors/id" ;
    _get_todo $res, "$CPAN/modules" ;
    _get_todo $res, "$CPAN/modules/by-module" ;
    _get_todo $res, "$CPAN/modules/by-category" ;
    $res ;
  }

sub insert_list
  { my $hist = shift ;
    my $lst  = shift ;
    my $tag  = shift ;
    OBB::logt ( "$TAG insert %d $tag events ...", scalar @$lst ) ;
    if ( $opt{f} )
      { my ( $err, $res ) = $hist -> insert_events ( $lst ) ;
        Error $err if $err ;
        OBB::logt ( "inserting done" ) ;
      }
    @$lst = () ;
  }

sub get_res
  { my $sock = shift ;
    my $line = <$sock> ;
    Server::Result -> from_text ( $line ) -> res ;
  }

sub send_txt
  { my $txt = shift ;
    my $secr = $conf -> secret ;
    my $serv = $conf -> server ; $serv .= ':' . $conf -> port
      unless $serv =~ /:\d+$/ ;
    my $SOCK = IO::Socket::INET -> new ( PeerAddr => $serv, Proto => 'tcp' ) ;
    if ( $SOCK )
      { print $SOCK "AUTH1\n" ;
        my $chal = get_res $SOCK ;
        my $resp = Util::mk_sum ( "$chal $secr\n" ) ;
        print $SOCK "AUTH2 $resp\n" ;
        my $auth = get_res $SOCK ;
        Error "can't authenticate" unless $auth ;
        print $SOCK $txt ;
        print $SOCK "\n" if substr $txt, -1 ne "\n" ;
        print $SOCK "quit\n" ;
        $SOCK -> shutdown ( 1 ) ; # done writing
        my @lines = <$SOCK> ;
        $SOCK -> shutdown ( 2 ) ; # done using
        for my $line ( @lines )
          { my $sres = Server::Result -> from_text ( $line ) ;
            my $cmd = $sres -> cmd ;
            my $err = $sres -> err ;
            my $res = $sres -> res ;
            if ( $err )
              { Error $sres -> err ; }
            elsif ( $cmd ne 'QUIT' )
              { printf "%s : %s\n", $cmd, $res ; }
          }
      }
    else
      { Error "can't open sock to $serv" ; }
  }

sub send_list
  { my $lst = shift ;
    my $tag = shift ;
    logx "$TAG send %d $tag events ...", scalar @$lst ;
    return unless $opt{f} ;
    my $txt = join "\n",
      ( ( map { sprintf ( 'push %s %s %s', @$_ ) ; } @$lst )
      , 'commit'
      ) ;
    send_txt $txt ;
  }

sub send_event
  { my $evt = shift ;
    logx sprintf "$TAG commit 1 event [%s]", join ',', @$evt ;
    return unless $opt{f} ;
    send_txt sprintf 'commit %s %s %s', @$evt ;
  }

sub send_ping { my @r = send_txt "ping" ; shift @r ; }
sub test { my ( $err, $res ) = send_ping ; exit ; } 

sub find_adds
  { my $hist = shift ;
    my $dbh  = $hist -> dbh ;
    my $todo = get_todo ;
    my $LEN = 1 + length $CPAN ;
    my $BUF = 10 * 1024 ;
    my %nam = () ;
    my %buf = () ;
    my $sel = IO::Select -> new ;
    my $add = [] ;
    my $fnd = 0 ;
    my %fnd ;
    my $res ;
    my $sql = "select count(*) from $TAB_EVTS where type = ? and path = ?" ;
    my $sth = $dbh -> prepare ( $sql ) or Error $dbh -> errstr ;
    logv "reading $CPAN ; looking for items to insert ..." ;
    new IO::File "file", "r";
    my $FIND ;
    if ( $opt{C} )
      { $FIND = new IO::File $DEF_FIND, 'w'
          or Error "can't write $DEF_FIND ($!)" ;
      }
    elsif ( defined $opt{C} )
      { $FIND = new IO::File $DEF_FIND, 'r'
          or Error "can't open $DEF_FIND ($!)" ;
        $sel -> add ( $FIND ) ;
        $BUF *= 2 ;
      }
    for my $full ( sort keys %$todo )
      { if ( -f $full or -l $full )
          { delete $todo -> { $full } ;
            my $path = substr $full, $LEN ;
            if ( $path =~ $IGNR_PAT )
              { printf "*** ignore todo %s\n", $path if $opt{v} ; }
            else
              { my $time ;
                unless ( cnt_sth $sth, 'new', $path )
                  { $time = ( lstat $full ) [ 9 ] ;
                    Error "no time for $full" unless defined $time ;
                    push @$add, [ 'new', $path, $time ] ;
                    logx "new %s\n", $path ;
                  }
                $res -> { $path } ++ ;
                $fnd ++ ;
                printf $FIND sprintf
                  ( "%s %s %s\n"
                  , ( -l $full ? 'l' : 'f' )
                  , ( $time || ( ( lstat $full ) [ 9 ] ) || 0 )
                  , $full
                  ) if $opt{C} ;
              }
          }
        elsif ( not -d $full )
          { Error "weird path [$full]" ; delete $todo -> { $full } ; }
      }

    my @todo = sort keys %$todo ;

    while ( $sel -> count or @todo )
      { while ( $sel -> count < $MAX and @todo )
          { my $path = shift @todo ;
            my @find = ( 'find', $path, '-printf', '%y %T@ %p\n' ) ;
            my $find = sprintf "find '%s'", join "' '", @find[1..$#find] ;
            my $pipe = new IO::Pipe ;
            $pipe -> reader ( @find ) or Error "can't popen $find ($!)" ;
            $sel -> add ( $pipe ) ;
            my $nam = $nam { $pipe } = substr $path, $LEN ;
            $fnd { $pipe } = 0 ;
            logd "opened %s\n", $nam ;
            logd "  busy %d ; todo %d\n", $sel -> count, scalar @todo ;
          }
        for my $fh ( $sel -> can_read )
          { my $buf = $buf { $fh } || '' ;
            my $len = sysread $fh, $buf, $BUF, length $buf ;
            if ( $len )
              { for ( my $idx = index $buf, "\n" 
                    ; $idx != -1
                    ; $idx = index $buf, "\n"
                    )
                  { my $line = substr $buf, 0, $idx + 1, '' ;
                    print $FIND $line if $opt{C} ;
                    my ( $ftyp, $time, $path ) = split ' ', $line ;
                    $path = substr $path, $LEN ;
                    if ( $path =~ $IGNR_PAT )
                      { printf "*** ignore find %s\n", $path if $opt{v} ; }
                    else
                      { unless ( $ftyp eq 'd' or cnt_sth $sth, 'new', $path )
                          { push @$add, [ 'new', $path, int $time ] ;
                            logx "new %s\n", $path ;
                          }
                        $res -> { $ftyp eq 'd' ? "$path/" : $path } ++ ;
                        $fnd { $fh } ++ ;
                        $fnd ++ ;
                      }
                  }
                $buf { $fh } = $buf ;
              }
            else
              { my $nam = $nam { $fh } ;
                Error "pipe %s buf [%s]\n" if $buf { $fh } ;
                $sel -> remove ( $fh ) ;
                $fh  -> close or Error "can't close $nam ($!)" ;
                delete $buf { $fh } ;
                logd "closed %s\n", $nam ;
                logd "  found %s ; total %s\n", $fnd { $fh }, $fnd ;
              }
            if ( $opt{s} )
              { send_event shift ( @$add ), 'new' if @$add ; }
            else
              { insert_list $hist, $add, 'new' unless @$add < 100000 ; }
          }
      }
    close $FIND if defined $opt{C} ;
    $sth -> finish ;
    if ( @$add )
      { if ( $opt{s} )
          { send_list $add, 'new' ; }
        else
          { insert_list $hist, $add, 'new' ; }
      }
    $res ;
  }

sub find_dels
  { my $hist = shift ;
    my $hav = shift ;
    my $dbh = $hist -> dbh ;
    my $del = [] ;
    my $cnt = 0 ;
    logv "reading $LITE ; looking for items to delete ..." ;
    my $sql = "select type, path from $TAB_EVTS" ;
    my $sth = $dbh -> prepare ( $sql ) or Error $dbh -> errstr ;
    $sth -> execute ;
    while ( my $row = $sth -> fetchrow_arrayref )
      { my $type = $row -> [ 0 ] ;
        my $path = $row -> [ 1 ] ;
        Error "del_find : path is undef" unless defined $path ;
        if ( $type eq 'new' and ! $hav -> { $path } )
          { push @$del, [ 'delete', $path, $^T ] ; logx "delete $path" ; }
        $cnt ++ ;
      }
    logv "in \$CPAN %s ; in $LITE %s", scalar keys %$hav, $cnt ;
    if ( @$del )
      { if ( $opt{s} )
          { send_list $del, 'delete' ; }
        else
          { insert_list $dbh, $del, 'delete' ; }
      }
    $del ;
  }

sub show_info
  { my $dbh = shift ;
    my $tups = db_get $dbh, "select key, val from $TAB_META order by key"
      or Error $dbh -> errstr ;
    my $cnt  = db_get $dbh, "select count(*) from $TAB_EVTS"
      or Error $dbh -> errstr ;
    printf "meta   :\n  %s\n", join "\n  "
      , map { sprintf "%-10s : %s", $_ -> [ 0 ], $_ -> [1] } @$tups ;
    printf "events : %s\n", $cnt -> [ 0 ] [ 0 ] ;
  } ;

sub re_init
  { my $dbh = shift ;
    db_do $dbh, "delete from $TAB_EVTS" or Error $dbh -> errstr ;
    if ( $opt{f} ) { $dbh -> do ( 'vacuum' ) or Error $dbh -> errstr ; }
    db_do $dbh, "update $TAB_META SET val = val + 1 WHERE key = 'scheme'"
      or Error $dbh -> errstr ;
  } ;

sub add_all_recents
  { my $hist = shift ;
    my $rcnt = Util::get_json ( "$CPAN/RECENT.recent" ) 
      or Error "can't get $CPAN/RECENT.recent" ;
    my $aggr = $rcnt -> {meta} {aggregator}
      or Error "can't get meta/aggregator" ;
    my $time = 0 ;
    for my $tag ( reverse @$aggr )
      { my $file = "$CPAN/RECENT-$tag.json" ;
        printf "getting $file ...\n" ;
        my $json = Util::get_json $file or Error "can't get $file" ;
        my $list = $json -> {recent} ;
        my $add  = [] ;
        while ( @$list )
          { my $item = pop @$list ;
            my $epoch = $item -> {epoch} ;
            if ( $epoch > $time )
              { push @$add 
                  , [ $item -> {type}
                    , $item -> {path}
                    , int ( $epoch )
                    ] ;
                $time = $epoch ;
                insert_list $hist, $add, 'rrr' unless @$add < 100000 ;
              }
          }
        insert_list $hist, $add, 'rrr' if @$add ;
      }
    $time ;
  }

sub add_last_event
  { my $hist = shift ;
    my $rcnt = Util::get_json ( "$CPAN/RECENT.recent" ) 
      or Error "can't get $CPAN/RECENT.recent" ;
    my $aggr = $rcnt -> {meta} {aggregator}
      or Error "can't get meta/aggregator" ;
    my $tag  = shift @$aggr ;
    my $file = "$CPAN/RECENT-$tag.json" ;
    printf "getting $file ...\n" ;
    my $json = Util::get_json $file or Error "can't get $file" ;
    my $list = $json -> {recent} ;
    my $item = shift @$list ;
    my $epoch = $item -> {epoch} ;
    my $event = [ $item -> {type} , $item -> {path} , $epoch ] ;
    insert_list $hist, [ $event ], 'rrr' ;
    $epoch ;
  }

my $hist = mk_hist $LITE ;
my $dbh  = $hist -> dbh ;

if ( $opt{D} ) { re_init $dbh ; show_info $dbh if $opt{f} ; exit ; }
if ( $opt{i} ) { show_info $dbh ; test ; exit ; }

my $epoch = $hist -> get_meta ( 'epoch' ) ;

if ( $epoch == 0 )
  { $epoch =
      ( $conf -> db_init
      ? add_all_recents $hist
      : add_last_event $hist
      ) ;
    if ( $opt{f} )
      { $hist -> set_meta ( 'epoch', $epoch ) ; }
    else
      { exit ; }
  }

if ( $opt{r} )
  { my $add = Util::get_events_since ( $epoch, $CPAN ) ;
    unless ( ref $add )
      { Error $add ; }
    elsif ( @$add )
      { my $epoch = $add -> [ $#$add ] -> [ 2 ] ;
        if ( $opt{s} )
          { send_list $add, 'rrr' ; }
        else
          { insert_list $hist, $add, "rrr" ; }
        $hist -> set_meta ( 'epoch', $epoch ) if $opt{f} ;
      }
  }
else
  { my $have = find_adds $hist ;
    my $del  = find_dels $hist, $have ;
  }

logv 'done' ;

END { printf "*** this is a DRY-RUN ***\n" unless $NOX ; }
