понедельник, 16 марта 2015 г.

Мастеринг связанных процессов в linux

Это развитие темы, поднятой в этой статье. Напомню, в ней был представлен способ гарантированного перезапуска сбоящего приложения, основанный на простой модели процессов мастер + воркер. Единственной задачей главного процесса (мастера) был немедленный перезапуск дочернего процесса (воркера) в случае завершения последнего в результате посылки ядром сигнала SIGSEGV. Новая задача будет сформулирована по-другому. Пусть у нас имеется два разных приложения. Нужно гарантировать, во-первых, что оба приложения будут выполняться одновременно, во-вторых, что перезапуск одного из приложений (после нормального завершения или получения сигнала) будет приводить к перезапуску второго приложения, и в-третьих, собственно перезапуск приложений в случае нормального завершения или получения заданных сигналов (пусть это будет SIGSEGV для определенности). Первые два условия означают, что гарантируется уникальность пар экземпляров двух приложений в любой момент времени: под связанностью процессов в заголовке статьи я подразумевал именно это. Условие связанности процессов может быть востребовано в случае, если один из них играет роль бэкенда, хранящего авторизационную информацию клиента, и общающегося с другим процессом — фронтэндом, непосредственно обслуживающим соединение с клиентом, через транспорт, не предоставляющий гарантий сохранения экземпляров взаимодействующих процессов, например TCP или UNIX-сокеты. Если условие связанности не будет выполняться, то перезапуск бэкенда приведет к утере авторизационной информации, в то время как клиентские сессии на фронтэнде останутся невредимы. Перезапуск фронтэнда в этом случае позволил бы перезагрузить переставшие быть валидными клиентские сессии. Эта проблема, на первый взгляд, кажется немного надуманной, но все же может возникнуть в реальности. Например, вам может понадобиться разработать приложение-бэкенд к сервису slapd, общающееся с последним через механизм slapd-sock. В этом случае slapd будет являться фронтэндом вашего приложения, который будет обязан перезапускать клиентские сессии (в рамках нашей задачи — “перезапускаться” сам) в случае завершения или падения бэкенда. В реальности перезапуском обеих частей нашего сервиса, как и прежде, будет заниматься мастер-процесс. Ниже я привожу исходный код соответствующей реализации, построчные комментарии ниже. Многие части полностью соответствуют коду из оригинальной статьи: их я комментировать не стану. Название файла с исходным кодом — main2.cpp.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#include <unistd.h>
#include <sys/wait.h>
#include <sys/prctl.h>
#include <sys/time.h>
#include <string.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
#include <iostream>
#include <iomanip>

#ifdef MAXCYCLES
#define LOOPSTOPCOND      i < MAXCYCLES
#else
#define LOOPSTOPCOND
#endif

#ifndef PDEATHQUITSIGNAL
#define PDEATHQUITSIGNAL  SIGTERM
#endif

#ifndef CHILDLIFETIME
#define CHILDLIFETIME     4
#endif

static int  pid0( 0 );
static int  pid1( 0 );
static int  pid2( 0 );

static struct timeval  start;


static inline std::ostream &  tprint( std::ostream &  out = std::cout,
                                      const std::string &  delim = " | " )
{
    struct timeval  tv;
    gettimeofday( &tv, NULL );
    int  ms( ( tv.tv_sec - start.tv_sec ) * 1000 +
             ( tv.tv_usec - start.tv_usec ) / 1000 );
    return out << std::setw( 7 ) << float( ms ) / 1000 << delim;
}


static void  inth( int  sig )
{
    if ( getpid() != pid0 )
        exit( 0 );

    tprint() << "Master terminated by signal " << sig << std::endl;
    if ( pid1 > 0 )
    {
        tprint() << "Sending signal " << sig << " to worker 1" << std::endl;
        kill( pid1, sig );
        waitpid( pid1, NULL, 0 );
    }
    if ( pid2 > 0 )
    {
        tprint() << "Sending signal " << sig << " to worker 2" << std::endl;
        kill( pid2, sig );
        waitpid( pid2, NULL, 0 );
    }
    exit( 0 );
}


static void  setinth( void ( *handler )( int ) )
{
    struct sigaction  act;
    memset( &act, 0, sizeof( act ) );
    act.sa_handler = handler;

    int  ints[] = { SIGINT, SIGQUIT, SIGTERM, SIGHUP, 0 };

    for ( int *  s( ints ); *s != 0; ++s )
        sigaction( *s, &act, NULL );
}


int  main( int  argc, char **  argv )
{
    std::cout.precision( 3 );
    std::cout.setf( std::ios::fixed );

    gettimeofday( &start, NULL );

    pid0 = getpid();
    tprint() << "Master: " << pid0 << std::endl;

    setinth( inth );

    for ( int  i( 0 ); LOOPSTOPCOND; ++i )
    {
        if ( ( pid1 = fork() ) == 0 )    /* Worker process 1 */
        {
            pid2 = 0;
            if ( prctl( PR_SET_PDEATHSIG, PDEATHQUITSIGNAL ) == -1 )
            {
                tprint() << "Worker 1: failed to set parent death signal, "
                        "exiting" << std::endl;
                return 1;
            }
            setinth( SIG_DFL );

            tprint() << "(cycle " << i << ") Worker 1: " << getpid() <<
                    std::endl;
            sleep( CHILDLIFETIME );

            break;
        }

        if ( pid1 < 0 )
        {
            tprint() << "Failed to fork a worker 1 process, exiting" <<
                    std::endl;
            return 1;
        }

        if ( ( pid2 = fork() ) == 0 )    /* Worker process 2 */
        {
            pid1 = 0;
            if ( prctl( PR_SET_PDEATHSIG, PDEATHQUITSIGNAL ) == -1 )
            {
                tprint() << "Worker 2: failed to set parent death signal, "
                        "exiting" << std::endl;
                return 1;
            }

            tprint() << "(cycle " << i << ") Worker 2: " << getpid() <<
                    std::endl;

            char * const  cmd[] = { ( char * )"test_slapd",
                                    ( char * )"-d",
                                    ( char * )"0",
                                    ( char * )"-h",
                                    ( char * )"ldap://localhost:3333/",
                                    ( char * )"-f",
                                    ( char * )"nullslapd.conf",
                                    NULL };
            execve( "/usr/sbin/slapd", cmd, NULL );

            tprint() << "Failed to exec slapd process, exiting" << std::endl;
            return 1;
        }

        if ( pid2 < 0 )
        {
            tprint() << "Failed to fork a worker 2 process, exiting" <<
                    std::endl;
            return 1;
        }

        bool       respawn( false );
        siginfo_t  siginfo;

        if ( waitid( P_ALL, 0, &siginfo, WEXITED | WSTOPPED ) == -1 )
        {
            tprint() << "waitid() error '" << strerror( errno ) <<
                    "', exiting" << std::endl;
            return 1;
        }

        int        cpid( siginfo.si_pid == pid1 ? 1 :
                         ( siginfo.si_pid == pid2 ? 2 : -1 ) );

        if ( cpid == -1 )
        {
            tprint() << "Bad child pid " << siginfo.si_pid <<
                    ", exiting" << std::endl;
            return 1;
        }

        int *      ppid( cpid == 1 ? &pid1 : &pid2 );

        *ppid = 0;

        tprint() << "Worker " << cpid;

        if ( siginfo.si_code == CLD_KILLED || siginfo.si_code == CLD_DUMPED )
        {
            int  sig( siginfo.si_status );

            std::cout << " was signaled " << sig << std::endl;

            switch ( sig )
            {
            case SIGSEGV:
                respawn = true;
            default:
                break;
            }
        }
        else
        {
            std::cout << " exited with status " << siginfo.si_status <<
                    std::endl;
            respawn = true;
        }

        cpid = cpid == 1 ? 2 : 1;

        tprint() << "Sending quit signal to worker " << cpid << std::endl;

        ppid = cpid == 1 ? &pid1 : &pid2;
        kill( *ppid, PDEATHQUITSIGNAL );
        waitpid( *ppid, NULL, 0 );
        *ppid = 0;

        if ( ! respawn )
            break;
    }

    return 0;
}
В строках 26–28 объявлены глобальные переменные pid0, pid1 и pid2, которые в дальнейшем, в функции main(), будут инициализированы значениями PID мастер-процесса, воркера-бэкенда и воркера-фронтэнда соответственно. Их необходимо сделать глобальными, поскольку обработчик сигнала inth(), о котором речь пойдет ниже, нуждается в доступе к ним. Если вам не хочется засорять глобальное пространство имен, то поместите объявления этих переменных внутрь анонимного namespace — все же таки на C++ пишем! В строке 30 объявлена еще одна глобальная переменная start, которая будет инициализирована в функции main() текущим значением времени. Она объявлена глобальной, поскольку к ней требуется доступ из функции tprint(), расположенной в строках 33–41. Функция tprint() очень полезна, она выводит в выходной поток (предположительно std::cout или std::cerr) время, прошедшее с начала старта программы. В строках 44–63 определена функция inth()обработчик прерывания мастер-процесса. Эта функция посылает тот же сигнал прерывания sig, которым был прерван мастер-процесс обоим дочерним процессам. Но предварительно она проверяет, что вызвавший ее процесс является мастером, сравнивая вызов getpid() с pid0. Ниже вы увидите, что воркер-бэкенд устанавливает все сигналы, которые обрабатываются в inth() в значение по умолчанию SIG_DFL, а воркер-фронтэнд вызывает execve(), которая в конечном итоге делает то же самое. Спрашивается, зачем тогда нужна эта проверка? Как известно, новый процесс после вызова fork() наследует обработчики сигналов родителя, соответственно существует очень короткий промежуток времени между рождением процесса и установкой его собственных обработчиков сигналов, в течение которого, если этот новый процесс будет прерван, в нем будет вызван родительский обработчик inth(), а это очень плохо. Поэтому проверка на pid0 в обработчике inth() необходима. Собственно обработчик прерывания настраивается в функции setinth(), определенной в строках 66–76. Этот код оформлен в виде отдельной функции, поскольку нашему воркеру-бэкенду понадобится вернуть обработчики прерываний в исходные значения SIG_DFL. Переходим к функции main(). В строках 81–82 настраивается форматирование потока cout для вывода времени. В строке 84 инициализируется значение глобальной переменной start, которая будет использоваться для вычисления времени, прошедшего с начала старта программы, в функции tprint(). В строке 89 настраиваются обработчики прерывания мастер-процесса. В строках 93–109 внутри цикла for, перезапускающего воркер-процессы (см. оригинальную статью), находится код воркера-бэкенда. Его задача простая — установить сигнал смерти родителя с помощью вызова функции prctl(), восстановить обработчики прерывания по умолчанию с помощью вызова setinth(), вывести сообщение о своем старте и просто заснуть на время, определенное в секундах в макросе CHILDLIFETIME, который по умолчанию равен 4 и может быть задан во время компиляции. Волшебный вызов prctl() нужен для гарантированного завершения воркера в случае смерти мастера. Зачем, спросите вы. Ведь мы и так посылаем сигнал прерывания воркерам из обработчика inth(). Верно, но если мастер будет убит сигналом SIGKILL, этот обработчик вызван не будет, и воркеры перейдут процессу init. Данный вызов prctl() гарантирует посылку заданного сигнала, который настраивается нашим макросом PDEATHQUITSIGNAL, в случае смерти родителя, даже если тот был убит сигналом SIGKILL. В строках 111–116 — банальная проверка на правильную отработку fork(). Далее, в строках 118–143 идет код воркера-фронтэнда, который запускает экземпляр исполняемого файла /usr/sbin/slapd с помощью вызова execve(). Перед этим, как и в случае с воркером-бэкендом, устанавливается сигнал смерти родителя и выводится сообщение о старте. Переустановка обработчиков сигналов не требуется, поскольку вызов execve() устанавливает обработчики в значения по умолчанию. Функция execve() принимает список строк cmd, который будет передан как массив строк argv в функцию main() нового исполняемого кода. Если execve() будет выполнен успешно, то код, следующий за ним (строки 141–142), выполняться не будет, другими словами в этих строках находится код, отвечающий за обработку ошибки execve(). Итак, в массиве cmd находится список опций командной строки исполняемого файла /usr/sbin/slapd. Первый элемент — это имя процесса. Если бы мы запустили slapd из командной строки оболочки, оно бы соответствовало slapd, в нашем случае оно будет test_slapd. Остальные опции подобраны таким образом, чтобы slapd можно было запустить без отрыва от терминала (-d 0) обычному пользователю (-h ldap://localhost:3333/ -f nullslapd.conf). Пустой файл nullslapd.conf необходимо предварительно создать в текущей директории. Есть одна интересная тонкость. Если в опции cmd добавить -u <user>, то сигнал смерти родителя сбросится. То есть в случае посылки мастеру сигнала SIGKILL процесс test_slapd не завершится, а поменяет родителя на процесс init, а это не то, что мы ожидаем. Это связано с тем, что опция -u приводит к вызовам setgid() и setuid() внутри кода slapd, а это приводит к сбросу сигнала смерти родителя (см. man prctl). Единственный способ предотвратить это — пропатчить исходный код slapd. В строках 145–150 — банальная проверка на правильную обработку fork() для воркера-фронтэнда. Обратите внимание, что между двумя воркерами нет никакого взаимодействия: данный пример просто не рассчитан на такие подробности. Зато ниже идет код, который будет обрабатывать завершение одного из воркеров вследствие нормального выхода или прерывания сигналом. Главную работу выполняет функция waitid(), которая ожидает завершения любого из потомков мастер-процесса (строки 155–160). Переменная cpid инициализируется значением 1, если был завершен воркер-бэкенд (воркер 1), или 2, если был завершен воркер-фронтэнд (воркер 2). После определения завершившегося процесса соответствующей глобальной переменной pid1 или pid2 присваивается значение 0 для того, чтобы не возникло проблем в обработчике прерывания мастер-процесса inth(). В строках 178–197 идет обработка информации о завершившемся процессе подобная той, которая была в оригинальной статье. Только на этот раз мы присваиваем переменной respawn значение true и в том случае, если процесс завершился нормально. Кроме этого, макросы WIFSIGNALED и WTERMSIG не работают правильно с waitid(), поэтому вместо них производится прямая проверка полей si_code и si_status переменной siginfo. В строках 199–206 мы идентифицируем второй воркер, посылаем ему сигнал, установленный в макросе PDEATHQUITSIGNAL, ожидаем его завершения и присваиваем соответствующей глобальной переменной pid1 или pid2 значение 0. Понятно, зачем первый воркер-бэкенд просто завершает свою работу после заданного времени? Я хочу протестировать, что второй воркер-фронтэнд получит сигнал завершения и оба процесса будут перезапущены. Соберем программу test2 с числом перезапусков 4
g++ -g -DMAXCYCLES=4 -o test2 main2.cpp
, и запустим ее без прерываний.
./test2
  0.000 | Master: 28165
  0.000 | (cycle 0) Worker 2: 28167
  0.002 | (cycle 0) Worker 1: 28166
  4.002 | Worker 1 exited with status 0
  4.002 | Sending quit signal to worker 2
  4.007 | (cycle 1) Worker 1: 28221
  4.007 | (cycle 1) Worker 2: 28222
  8.007 | Worker 1 exited with status 0
  8.007 | Sending quit signal to worker 2
  8.011 | (cycle 2) Worker 1: 28276
  8.012 | (cycle 2) Worker 2: 28277
 12.012 | Worker 1 exited with status 0
 12.012 | Sending quit signal to worker 2
 12.018 | (cycle 3) Worker 1: 28331
 12.019 | (cycle 3) Worker 2: 28332
 16.019 | Worker 1 exited with status 0
 16.019 | Sending quit signal to worker 2
Все верно. Каждые четыре секунды воркер-бэкенд завершал работу, мастер посылал сигнал прерывания фронтэнду и перезапускал их обоих. Давайте на каком-либо этапе прервем мастер-процесс.
./test2
  0.000 | Master: 325
  0.000 | (cycle 0) Worker 1: 326
  0.001 | (cycle 0) Worker 2: 327
  4.001 | Worker 1 exited with status 0
  4.001 | Sending quit signal to worker 2
  4.008 | (cycle 1) Worker 1: 383
  4.009 | (cycle 1) Worker 2: 384
^C  5.465 | Master terminated by signal 2
  5.465 | Sending signal 2 to worker 1
  5.465 | Sending signal 2 to worker 2
Работает. А теперь давайте запустим test2, перейдем во второй терминал, узнаем PID воркера-бэкенда, и пошлем ему сигнал SIGSEGV. Только сначала пересоберем test2 с другим значением CHILDLIFETIME, а то я не буду успевать переключаться между терминалами.
g++ -g -DMAXCYCLES=4 -DCHILDLIFETIME=10 -o test2 main2.cpp
./test2
  0.000 | Master: 9834
  0.000 | (cycle 0) Worker 1: 9835
  0.001 | (cycle 0) Worker 2: 9836
 10.001 | Worker 1 exited with status 0
 10.001 | Sending quit signal to worker 2
 10.005 | (cycle 1) Worker 1: 9973
 10.005 | (cycle 1) Worker 2: 9974
Во втором терминале быстро, как только появилась запись о старте cycle 1, вводим
ps -ef | grep [t]est
lyokha    9834 27454  0 22:47 pts/4    00:00:00 ./test2
lyokha    9973  9834  0 22:48 pts/4    00:00:00 ./test2
lyokha    9974  9834  1 22:48 pts/4    00:00:00 test_slapd -d 0 -h ldap://localhost:3333/ -f nullslapd.conf
kill -SEGV 9973
Возвращаемся в первый терминал и смотрим остаток вывода test2.
 18.545 | Worker 1 was signaled 11
 18.545 | Sending quit signal to worker 2
 18.551 | (cycle 2) Worker 2: 10122
 18.554 | (cycle 2) Worker 1: 10121
 28.555 | Worker 1 exited with status 0
 28.555 | Sending quit signal to worker 2
 28.567 | (cycle 3) Worker 2: 10272
 28.570 | (cycle 3) Worker 1: 10271
 38.571 | Worker 1 exited with status 0
 38.571 | Sending quit signal to worker 2
Все верно. Ручные переключение во второй терминал, определение PID воркера-бэкенда и посылка ему сигнала заняли 8.5 секунд, так что четыре секунды мне бы явно не хватило. Можно еще поиграть разными способами. Например, послать сигнал прерывания или сигнал SIGKILL одному из воркеров, или убить мастер-процесс сигналом SIGKILL. В обоих случаях и мастер, и оба воркера должны благополучно завершиться.

понедельник, 9 февраля 2015 г.

Хотите 2 + 2 = 5 в haskell?

Ничего сверхъестественного. Просто
2 + 2
5
Это фрагмент сессии ghci. Чтобы получить этот результат, строкой выше я ввел
let 2 + 2 = 5
Те, кто не первый день работает с haskell, легко разберутся в смысле этого выражения. Новички, наверное, посчитают его некой волшебной синтаксической конструкцией, на низком уровне переопределяющей семантику языка. Адепты языка C++ возможно предположат, что здесь происходит перегрузка оператора +. Это неверно, поскольку в haskell нет никаких операторов в смысле C++, но уже теплее. На самом деле, здесь объявлена инфиксная функция (+) (именно так, в скобках). Ее инфиксность в данном объявлении выражается в записи ее двух аргументов (2 и 2) слева и справа. Мы могли бы записать функцию (+) в обычной форме, когда аргументы расположены справа.
let (+) 2 2 = 5
В этом случае нам пришлось записать ее полное имя с окружающими скобками, чтобы избежать синтаксической ошибки. А что если аргументы функции отличаются от двоек? У нас ведь нет определения для этого случая.
3 + 4
*** Exception: <interactive>:40:5-13: Non-exhaustive patterns in function +
Так и есть. Тогда другой вопрос. Куда делась стандартная функция (+)? А никуда не делась: она по-прежнему определена в стандартном модуле Prelude, только теперь ее определение скрыто нашим новым определением. Чтобы использовать стандартный +, нужно полностью квалифицировать имя функции (+).
2 Prelude.+ 2
4
2 Prelude.+ 3
5
Пусть наш новый + возвращает 5 на 2 + 2 и стандартное значение в других случаях. Для этого нужно определить функцию (+) следующим образом.
:{
let 2 + 2 = 5
    a + b = a Prelude.+ b
:}
Все клозы одной функции должны быть объявлены в ghci одновременно, поэтому я использовал синтаксис многострочного объявления ghci с маркерами начала :{ и конца :}. Говоря более формально, данный синтаксис позволил связать (bind) все возможные клозы и аргументы функции (+) в едином лексическом пространстве (lexical scope). Проверяем.
2 + 2
5
3 + 4
7
Еще одна проверка.
3 + 4 * 2
14
А должно быть 11! Вычисление произошло таким образом, как если бы 3 и 4 сперва сложились, а затем результат сложения (7) был умножен на 2. Ну так и есть, мы же не определили ассоциативность и приоритет нашей инфиксной функции! По умолчанию в haskell любая инфиксная функция левоассоциативна и имеет наибольший приоритет, равный 9. Давайте определим значения ассоциативности и приоритета стандартного +
:i Prelude.+
class Num a where
  (Prelude.+) :: a -> a -> a
  ...
        -- Defined in ‘GHC.Num’
infixl 6 Prelude.+
и установим их для нашего +.
:{
let 2 + 2 = 5
    a + b = a Prelude.+ b
    infixl 6 +
:}
Вот теперь результаты вычислений в инфиксных выражениях (за исключением того, что 2 + 2 = 5) должны быть верными.
3 + 4 * 2
11
Кстати, есть весьма тонкое отличие в типе стандартной функции (+) и определенной нами. Прежде всего, обратите внимание на определение стандартного (+) в выводе :i Prelude.+ выше. Оно начинается со строки class Num a where. Это нестандартная функция в том смысле, что она объявлена как метод класса Num. Класс Num предоставляет базовый интерфейс для типов, которые хотят быть похожими на числа. Вот его определение.
:i Num
class Num a where
  (Prelude.+) :: a -> a -> a
  (*) :: a -> a -> a
  (-) :: a -> a -> a
  negate :: a -> a
  abs :: a -> a
  signum :: a -> a
  fromInteger :: Integer -> a
        -- Defined in ‘GHC.Num’
instance Num Integer -- Defined in ‘GHC.Num’
instance Num Int -- Defined in ‘GHC.Num’
instance Num Float -- Defined in ‘GHC.Float’
instance Num Double -- Defined in ‘GHC.Float’
Здесь перечислены все методы, которые должен реализовать тип, который хочет быть похожим на число, а также список типов (элементы после слов instance Num), которые уже реализовали методы класса Num. А теперь посмотрим на определение нашего +.
:i (+)
(+) :: (Num a, Eq a) => a -> a -> a
        -- Defined at <interactive>:7:7
infixl 6 +
Это простая функция, поэтому ее определение начинается сразу с описания типа, который равен (Num a, Eq a) => a -> a -> a. Выражения в скобках — это ограничения на тип (type constraints), буква aпеременная типа. Вся запись означает, что наш + реализует полиморфную функцию, то есть функцию, которая принимает аргументы любого типа с условием, что он должен предоставить реализации (instances) классов, перечисленных в списке ограничений на тип. Поскольку мы не стали определять тип функции (+) вручную, компилятор вывел его за нас. В определении первого клоза функции (+) мы использовали число 5 (его тип равен Num a => a), а в определении второго клоза — функцию Prelude.+, которая тоже ограничена классом Num — отсюда ограничение Num a. А откуда ограничение на Eq a, которого нет в стандартной функции (+)? Класс Eq предоставляет интерфейс для сравнения элементов типа: функции (==) и (/=). А теперь посмотрите на образец (pattern) первого клоза функции (+), он записан в инфиксной форме как 2 + 2. Чтобы механизм сопоставления с образцом (pattern matching) мог выбрать правильный клоз, он должен уметь сравнивать аргументы с двойкой — отсюда ограничение Eq a. Но несмотря на дополнительное ограничение Eq a, наша функция (+) будет работать со всеми типами, с которыми работает Prelude.+, поскольку все типы, реализующие методы класса Num, как правило реализуют и методы класса Eq.

понедельник, 5 января 2015 г.

haskell: оборачивание в Maybe как способ снятия ограничений на тип полиморфной функции

Решал на днях такую задачку. Дана нестрого возрастающая последовательность чисел, например [1, 2, 3, 3, 3, 4, 4, 5, 6]. Требуется построить массив строго возрастающих подпоследовательностей максимальных длин. Для приведенной последовательности первому условию будут удовлетворять списки [[1, 2, 3], [4], [5, 6]], [[1, 2], [3, 4], [5, 6]] и множество других. Второе условие требует, чтобы подпоследовательности имели максимально возможные длины. Этому условию в нашем примере удовлетворяет единственный вариант [[1, 2, 3], [4, 5, 6]]. В процессе решения задачи появилась необходимость в реализации функции splitAtEqualBounds, разбивающей исходную последовательность на подпоследовательности, граничащие друг с другом равными числами. При этом подпоследовательности, содержащие лишь один элемент должны быть отброшены как неинтересные. Так, для приведенной выше последовательности таким разбиением будет [[1, 2, 3], [3, 4], [4, 5, 6]]. Заметьте, что наша функция не требует возрастания исходной последовательности: она просто разбивает ее в областях повторения равных чисел, игнорируя длины этих областей. Реализовать функцию splitAtEqualBounds можно с помощью правой свертки исходной последовательности. В аккумуляторе будем накапливать результирующие подпоследовательности и последний прочитанный элемент-число. Представьте, что в процессе свертки мы прочли очередное число, которое отличается от того, что хранится в аккумуляторе. В этом случае мы помещаем это число в голову головного списка из аккумулятора (не в конец последнего списка, поскольку свертка правая!). В случае, если новое число совпало с числом из аккумулятора, мы добавляем в голову списка в аккумуляторе новый список, содержащий только это число. После свертки всех элементов исходной последовательности в результате получим кортеж, состоящий из списка списков чисел и уже не нужного числа, равного первому числу из исходной последовательности. Окончательный результат функции splitAtEqualBounds — это и есть этот список списков без подсписков с единственным элементом. Осталось понять, чему должно быть равно исходное значение аккумулятора. Итак, это кортеж из списка списков и некоторого числа. Этот исходный список списков согласно нашему алгоритму будет добавлен в конец результирующего списка списков. Пусть исходным значением аккумулятора будет ([[]], 0). Тогда функция splitAtEqualBounds примет вид
splitAtEqualBounds :: (Num a, Eq a) => [a] -> [[a]]
splitAtEqualBounds = filter ((>1) . length . take 2) . fst .
    foldr (\x (y@(z : zs), a) ->
        if x == a then ([x] : y, x) else ((x : z) : zs, x)) ([[]], 0)
Почему 0? Что если последним элементом исходной последовательности тоже будет 0? Оказывается, конкретное значение исходного числа в аккумуляторе при таком выборе исходного списка с единственным пустым подсписком вообще не имеет значения! Представьте, что последнее число исходной последовательности действительно равно 0. Тогда в хвосте результирующего списка списков на выходе из свертки окажется пустой подсписок, который благополучно отфильтруется функцией filter ((>1) . length . take 2). Если же последним числом исходной последовательности будет какое-либо другое число, то оно станет головой перед пустым списком, что сразу соответствует желаемому результату. Кстати, обратите внимание на использование take 2 в предикате фильтра: это простая оптимизация на случай, если подсписки окажутся очень длинными. Что плохо в этой функции? То, что мы используем явное число 0, а это ограничивает применение функции splitAtEqualBounds списками, типы элементов которых соответствуют классу Num a, то есть списками чисел. Мы не сможем применить нашу функцию к спискам строк, например. Так давайте избавимся от явного упоминания числа в аккумуляторе свертки и ограничения Num a в типе функции. Вот один из вариантов.
splitAtEqualBounds :: Eq a => [a] -> [[a]]
splitAtEqualBounds [] = []
splitAtEqualBounds x@(z : _) = filter ((>1) . length . take 2) . fst .
    foldr (\x (y@(z : zs), a) ->
        if x == a then ([x] : y, x) else ((x : z) : zs, x)) ([[]], z) $ x
Здесь мы просто связали (bind) аргумент функции x и подставили голову z вместо числа 0 в исходное значение аккумулятора. Но мне все равно не нравится такое решение, хотя функция вполне рабочая. Во-первых, пришлось добавить новый клоз для случая, когда аргументом функции является пустой список. Во-вторых, связывание аргумента, на мой взгляд, вредит элегантности кода. Ну и самое главное. Человек, не знающий, что для нашего алгоритма исходное значение числа в аккумуляторе свертки не важно, в праве задать вопрос: а почему сюда помещается первый элемент исходного списка, а не последний, например? Другими словами, использование произвольного элемента из исходного списка (мы взяли голову только потому, что это удобно и эффективно) грубо нарушает семантику (то есть смысл) исходного значения аккумулятора свертки. По смыслу, там должно находиться некоторое значение, подчеркивающее свою особенную неважность. Вот это подсказка! В haskell такое особенное значение предоставляет тип-обертка Maybe. Это значение — Nothing — особенно хорошо тем, что не ссылается на значения хранимого типа, а значит его можно использовать для исходного числа в аккумуляторе свертки без ограничений на тип функции splitAtEqualBounds. Вот ее новая реализация.
splitAtEqualBounds :: Eq a => [a] -> [[a]]
splitAtEqualBounds = filter ((>1) . length . take 2) . fst .
    foldr (\x (y@(z : zs), a) -> (,Just x) $
        if isNothing a || x == fromJust a
            then [x] : y
            else (x : z) : zs) ([[]], Nothing)
Вот теперь прекрасно видно, что исходное значение аккумулятора свертки — это просто некоторое неважное число. На этот раз для пущей элегантности я решил вынести второй элемент кортежа (раньше это был просто x, теперь Just x) вперед конструкции if: это не связано с изменением типа аккумулятора и могло быть проделано и в предыдущих вариантах функции splitAtEqualBounds. Выражение (,Just x) называется сечением кортежа (tuple section) и является расширением ghc, требующим указания опции -XTupleSections при компиляции, либо помещения строки
{-# LANGUAGE TupleSections #-}
в начало файла с исходным кодом. Кроме этого, новая реализация использует функции isNothing и fromJust из модуля Data.Maybe, поэтому в начало файла с исходным кодом следует также добавить
import Data.Maybe (isNothing, fromJust)
На этом, вроде бы, всё. Но давайте я все таки приведу функцию, которая решает задачу, описанную в начале статьи. Напомню, нам нужно построить строго возрастающие подпоследовательности с максимальными длинами.
buildLongerSubseqs :: Eq a => [a] -> [[a]]
buildLongerSubseqs = filter (not . null) . build . splitAtEqualBounds
    where build []       = [[]]
          build (x : xs) = snd $ mapAccumL meltShorterAndSwap x $ xs ++ [[]]
          meltShorterAndSwap x []    = ([], x)
          meltShorterAndSwap x y
              | length x >= length y = (tail y, x) 
              | otherwise            = (y, init x)
Для использования mapAccumL нужно подключить
import Data.List (mapAccumL)
Итак, список списков, возвращенный функцией splitAtEqualBounds, передаем в локальную функцию build. Функция build реализована с помощью библиотечной функции mapAccumL — гибрида левой свертки и отображения (map). Аккумулятором нашей свертки будет список типа a, его исходным значением — первый подсписок списка, возвращенного функцией splitAtEqualBounds. Функция свертки — meltShorterAndSwap — принимает два списка. Если длина первого списка больше или равна длине второго, то она возвращает кортеж (хвост второго списка, первый список), иначе — кортеж (второй список, первый список без последнего элемента). Свертка-отображение в функции build сформирована таким образом, что в функцию meltShorterAndSwap передаются пары соседних элементов исходного списка списков, начиная с пары первый и второй подсписки (дальше это уже не пара второй и третий подсписки, а возможно измененный второй и третий подсписки — для этого и нужна функция mapAccumL, см. ниже). Поскольку последний элемент первой пары совпадает с первым элементом второй пары, то функция meltShorterAndSwap как бы стирает границу между ними, предпочитая удалять соответствующий элемент из списка наименьшей длины. Я представил этот процесс как таяние (melt) списка наименьшей длины, отсюда и название функции. Функция meltShorterAndSwap возвращает возможно измененный первый список во втором элементе кортежа, а возможно измененный второй список — в первом. Этот обмен элементами (swap) необходим, поскольку функция свертки должна возвращать для mapAccumL кортеж, первым элементом которого является аккумулятор, а вторым — строящийся список-отображение. В нашем случае в аккумулятор помещается возможно измененный в первый раз второй элемент пары (и, естественно, на следующем шаге свертки он станет кандидатом на добавление в строящийся список как первый элемент пары), а в строящийся список — возможно измененный во второй раз, ставший в прошлый раз аккумулятором, первый элемент пары. Вот так замысловато! Надо сказать, что на последнем шаге свертки останется пара возможно измененный последний подсписок и пустой список — для этого случая предусмотрен первый клоз функции meltShorterAndSwap. В списке, возвращенном функцией build могут присутствовать пустые списки. Это может произойти в случае, если исходный двух-элементный подсписок окружали подсписки большей длины. В этом случае эти большие подсписки растопят маленький список с двух сторон, превратив его в пустой список. Пустые списки удаляются из результирующего списка с помощью filter (not . null). Кстати, сравнение длин в функции meltShorterAndSwap может оказаться неэффективным, если длина одного из переданных ей списков будет значительно превышать длину второго. Если вопрос эффективности важен, можно заменить сравнение длин в охранном выражении length x >= length y на вызов новой функции isNotShorter, определив ее ниже. Вот переписанный участок исходного кода (начиная с первого клоза meltShorterAndSwap).
          meltShorterAndSwap x []  = ([], x)
          meltShorterAndSwap x y
              | x `isNotShorter` y = (tail y, x) 
              | otherwise          = (y, init x)
          isNotShorter _        []       = True
          isNotShorter []       _        = False
          isNotShorter (_ : xs) (_ : ys) = isNotShorter xs ys
Давайте проверим работу функции buildLongerSubseqs. Вот такая тестовая функция main
main :: IO ()
main = do
    let seqs = [[1, 2, 3, 3, 3, 4, 4, 5, 6],
                [1, 2, 3, 3, 4, 4, 5, 6],
                [1, 2, 3, 3, 4, 4, 5, 5, 6, 7],
                [1, 2, 3, 3, 4, 5, 5, 6, 7],
                [1, 1, 1, 2, 3, 3, 4, 5, 5, 6, 7],
                [1, 1, 1, 2, 3, 3, 4, 4, 5, 5, 6, 6, 6, 6, 7, 8, 9]]
    mapM_ (\x -> putStr ">>> " >> print x >> putStr "    " >>
        print (buildLongerSubseqs x)) seqs
выводит на экран
>>> [1,2,3,3,3,4,4,5,6]
    [[1,2,3],[4,5,6]]
>>> [1,2,3,3,4,4,5,6]
    [[1,2,3],[4,5,6]]
>>> [1,2,3,3,4,4,5,5,6,7]
    [[1,2,3],[4],[5,6,7]]
>>> [1,2,3,3,4,5,5,6,7]
    [[1,2,3],[4],[5,6,7]]
>>> [1,1,1,2,3,3,4,5,5,6,7]
    [[1,2,3],[4],[5,6,7]]
>>> [1,1,1,2,3,3,4,4,5,5,6,6,6,6,7,8,9]
    [[1,2,3],[4,5],[6,7,8,9]]
На мой взгляд, это правильный результат. Еще раз подчеркну, что функция buildLongerSubseqs может работать не только с числами, но и с любыми типами, ограниченными только классом Eq a. Кроме того, она будет работать не только с возрастающими, но и с любыми другими последовательностями, разбивая исходную последовательность в областях с повторяющимися элементами.