Javascript.RU

COMET: серверная часть - паттерны реализации

В этой статье мы рассмотрим распространенные способы создания COMET-сервера и примеры таких серверов на языках Java, Javascript, Python, Erlang.

Основное внимание уделено паттернам серверной поддержки, хотя и примеры реализаций, рассмотренные ниже, вполне расширяемы и жизнеспособны.

При программировании сайтов, создании динамических страничек и сервисов, как правило, удобен простой цикл обработки запроса:

  1. Принять запрос
  2. Сгенерировать страничку
  3. Выдать страницу
  4. Закрыть соединение

При этом для параллельной обработки запросов создается много процессов или потоков, между которыми распределяются приходящие запросы.

Рассмотрим эти два варианта подробнее.

Самый простой для программирования подход - это обрабатывать запрос в отдельном процессе операционной системы.

В этом случае программа, генерирующая страницы, запускается в множестве экземпляров (процессов), и пришедший запрос отдается тому процессу, который в данный момент свободен.

Можно запускать фиксированное количество, например, ровно 20 процессов, можно регулировать количество процессов динамически: например, от 5 до 30, в зависимости от количества запросов и нагрузки.

При этом по окончании обработки запроса процесс не умирает, а ждет следующего запроса, обрабатывает его и т.д.

Эту модель называют еще "pre-fork", и она заменила собой совсем старый подход, когда на каждый запрос происходил отдельный запуск программы-генератора страницы.

Таким образом работает большинство интерпретаторов с интерфейсом FastCGI: PHP/FastCGI, Perl/FastCGI, Ruby/FastCGI и другие. Сюда же относится WSGI(Python), да и вообще много чего.

Плюс этого способа - в том, что он простой. Еще бы - процесс является "средой в себе", со своими переменными, своим потоком управления, и никто случайно из одного процесса в другой не залезет.

Можно спокойно разобрать запрос, сгенерировать и выдать ответ посетителю.

А если вдруг где-то ошибка - так ничего: процессы изолированы на уровне операционной системы. Один упал зато другие живы.

Когда процесс заканчивает обработку запроса, в его адресном пространстве (где хранятся переменные, функции и т.п.) содержатся данные, которые были необходимы для генерации ответа.
Например, выбранные записи из базы данных, информация о посетителе, загруженные библиотеки и т.п.

Чтобы корректно обработать следующий запрос - нужно эти лишние данные удалить, почистить адресное пространство процесса. Для этого есть два основных подхода.

В случае, когда программа написана на каком-либо скриптовом языке, интерпретатор этого языка может очищать адресное пространство после окончания запроса.

Таким образом, очистка происходит помимо программиста, который хотя и лишен возможности влиять на происходящее (скажем, оставить что-то из данных процесса "на потом"), зато уверен, что каждый запрос начинает обрабатываться "с чистого листа".

Это происходит, например, в PHP и дает дополнительные гарантии от утечек памяти.

Чистка интерпретатором - решение, мягко говоря, жестковатое. Вместе с лишними переменными убивается и все полезное: библиотеки, синглтоны, вспомогательные объекты, которые можно было бы использовать при обработке следующего запроса.

Получается, что при новом запросе все эти объекты будут созданы заново. Но если мы используем большой мощный фреймворк, то каждый раз инициализовывать его заново - нерационально.

Поэтому естественное решение - возложить обязанности по чистке на программиста. Пусть сам удаляет то, что не нужно.

Типичный цикл обработки запроса при этом выглядит так:

// инициализация фреймворка
include MyFramework;

MyFramework::init();

// while ждет, пока процессу будет передан новый запрос
while(request = FastCGI::accept()) {
    MyFramework::process(request);
}

Функция MyFramework::process(request) читает нужные данные, генерирует и выдает ответ.
Объект запроса request является локальным для функции и по окончании ее работы будет удален, как и все остальные локальные и недостижимые снаружи объекты. А необходимые библиотеки и синглтоны - сохранены до следующего запроса.

Удобно? Удобно! Получается достаточно оптимальный цикл работы, который используется, например, в Perl/FastCGI, Ruby/FastCGI.

Общение между процессами и обмен данными, как правило, осуществляются через выделенную область памяти. При этом можно передавать только строковые представления объектов, которые необходимо превращать в реальный объект перед использованием.

Потоки отличаются от процессов одной "мелочью" - общим адресным пространством.

Поэтому несколько одновременно пришедших запросов, хотя и будут обрабатываться разными потоками, имеют доступ к одним и тем же переменным.

При этом никто не гарантирует, что скорость обработки запросов будет одинаковая у разных потоков, и с этим связана одна важная проблема.

Например, есть код:

counter = 0
...
counter++
...
print counter

При поточном выполнении - вообще говоря, не факт что print counter выведет единицу. Может быть и ноль, если какой-то другой запрос начал обрабатываться и успел обнулить счетчик, пока мы были между "counter++" и "print counter". Переменная-то одна на всех.

Это и является основной сложностью поточного программирования - необходимость синхронизировать доступ к общим ресурсам между потоками: к соединениям с базой, переменным, и т.п.

Пример вверху решается очень легко: достаточно сделать counter локальной для функции обработки запроса:

process(request) {
  int counter = 0;
  ...
  counter++;
  ...
  print counter;
}

Каждый новый запрос вызовет process со своим объектом запроса, а локальные переменные - на то и локальные, чтобы быть видными только при текущем запуске функции. Так что в данном случае каждый запрос будет использовать свою собственную переменную counter и проблем не возникнет.

Кроме того, можно явно привязать переменную или объект к текущему потоку, использовать "критические" блоки кода, т.е. такие, в которые может зайти не более одного потока за раз, применять мьютексы и другие приемы многопоточного программирования.

Единое адресное пространство позволяет избежать затрат и сложностей, связанных с обменом данными между процессами. В частности, один поток может вызвать функцию другого и т.п.

Создание потока - более быстрая операция, чем создание нового процесса, что также является преимуществом многопоточной модели. Но аккуратное разделение и синхронизация переменных, отлов возникающих глюков могут сильно осложнить жизнь.

Такая модель - классическая для сервлет-контейнеров (Java): таких как Tomcat, Resin и другие. Она подробно описана в спецификации Servlet API 2.5. Она же используется и в C#.NET.

COMET-приложение имеет другой режим функционирования, принципиально отличающийся от обычной генерации страницы.

  1. Принять запрос (посетитель подключился к чату)
  2. Ждать, пока появится сообщение
  3. Получив сообщение, сгенерировать на его основе ответ
  4. Выдать ответ
  5. (Зависит от реализации) Закрыть соединение

Как правило, большую часть времени соединение висит в ожидании события. При этом выделенный процесс или даже поток на такое соединение - колоссальный перерасход ресурсов, прежде всего - памяти.

Одновременно подключенные к серверу 100 человек при классическом подходе дали бы 100 процессов(или потоков), большая часть которых будет висеть и ждать сообщения. Это достаточно большие затраты памяти, да и процессорное время тоже тратится, хотя и не в такой степени.

Проблема становится очевиднее при цифрах в 1000, 10.000 соединений, которые вполне достижимы в проектах, рассчитанных на большое количество одновременных пользователей: чаты, онлайн-игры, сайты знакомств и др.

Существует много COMET-серверов, но все они основаны на всего нескольких паттернах преодоления этой проблемы.

Этот паттерн заключается в том, что все действия, которые требуют какого-либо ожидания или занимают существенное время, совершаются асинхронно.

На клиентской стороне такой паттерн вовсю используется в javascript:

var xhr = new XmlHttpRequest()
function processResult(result) {
 ...
}
xhr.onreadystatechange = processResult

xhr.send()
...

Вызов xhr.send() запускает выполнение запроса, и скрипт продолжает выполняться. А когда XmlHttpRequest выполнится, то он запустится обработчик события onreadystatechange - функция processResult.

Таким же образом устроено и серверное асинхронное программирование. Рассмотрим это на примере COMET-сервера, написанного на фреймворке Twisted (язык Python).

Следующий пример иллюстрирует работу COMET-сервера в режиме long-poll. У него есть две точки подключения: /publish и /subscribe.

При подключении к /subscribe сервер держит соединение, пока не появится сообщение. Если подключилось много клиентов - все они будут висеть и ждать.

При подключении к /publish сервер считывает сообщение из параметра message и отправляет его всем клиентам, закрывая соответствующие соединения.

class ClientManager:
    clients = []
    def registerClient(self, client):
        self.clients.append(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.write(message)
            client.finish()
        self.clients = []
clientManager = ClientManager()


class CometServer(resource.Resource):
    isLeaf = False
    def __init__(self):
        resource.Resource.__init__(self)
        self.registerChildren()

    def registerChildren(self):
        self.putChild('subscribe', Subscribe() )
        self.putChild('publish', Publish() )

    def getChild( self, path, req ):
        if path == '':
            return self
        else:
            return resource.Resource.getChild( self, path, req )


class Subscribe(resource.Resource):
    isLeaf = True
    def render_GET(self, req):
        clientManager.registerClient(req)
        return server.NOT_DONE_YET


class Publish(resource.Resource):
    isLeaf = True
    def render_POST(self, req):
        clientManager.broadcastMessage(req.args['message'][0])
        return "ok"


application = service.Application("App")
service = internet.TCPServer(8001, server.Site(CometServer()))
service.setServiceParent(application)

Разберем основные сущности подробнее:

ClientManager
Вспомогательный объект-синглтон, хранящий ожидающие сообщение соединения в списке clients.

Умеет добавлять подключившихся клиентов в список при вызове registerClient и рассылать всем из списка сообщение в broadcastMessage.

CometServer
Основной объект-синглтон, являющийся сервером. Он написан на фреймворке Twisted.web и делегирует обработку соединений объектам Subscribe() и Publish(), которые делают реальную работу.
Subscribe
Любой клиент. который делает запрос на адрес /subscribe, будет обработан (одним и тем же) объектом этого класса. В данном случае он добавляет клиента в список и возвращает специальное значение server.NOT_DONE_YET, получив которое, Twisted не закрывает соединение.

Таким образом, полученные соединения передаются для хранения в clientManager.

Publish
Получив POST-запрос, рассылает сообщение message его всем подписчикам.
код после Publish
Служебный код для инициализации и запуска серверного демона. Для нас он не важен.

Все происходит в едином адресном пространстве и в одном процессе/потоке.

Обработка множества одновременных соединений происходит благодаря асинхронному подходу.

Любое соединение, требущее ожидание, добавляется в список clientManager.clients, который используются при получении сообщений.

Что очень удобно, соединения доступны в любой момент. Их можно пронумеровать, положить в ассоциативный массив по сессии клиента и т.п.
Получив сообщение, можно выбрать нужные соединения из списка ожидающих и сделать с ними все что угодно.

Описанная асинхронная модель дает общее адресное пространство, и при этом - никаких проблем с синхронизацией доступа, свойственных потокам.

Для того, чтобы работать в таком асинхронном режиме, код должен поддерживать соответствующее API и вызывать каллбэки. Не весь код сделан таким образом.

Например, ряд библиотек для работы с базами данных работает только синхронно, без каллбэков. Синхронное программирование, вообще, проще и привычнее большинству программистов.

Запускать такие функции в одном потоке нельзя - они заблокируют обработку новых соединений.
Чтобы все-таки сделать возможными такие синхронные вызовы, в Twisted есть прямой доступ (adbapi использует его неявно) к пулу потоков, размер которого можно указать явно вызовом:

reactor.suggestThreadPoolSize(100)

Функция, которая работает синхронно, может быть отправлена работать в отдельный поток вызовом deferToThread:

deferred = threads.deferToThread(syncSQL, "SELECT * FROM mytable")

deferred.addCallback(printResult)

Возвращаемое значение deferred является специальным объектом типа Deferred и вызовет функции, добавленные через addCallback, в момент получения результата.

В данном случае - в момент, когда функция из отдельного потока закончит выполнение.

Использовать потоки можно и в более явном виде, управлять ими вручную и т.п. Но, как правило, при асинхронном программировании все основные данные хранятся в единственном основном процессе, а потоки используются только для тех операций, которые не поддерживают асинхронный вызов.

Основной процесс использует только одно ядро. Если нагрузка на процессор становится слишком большой, то никакого автоматического распараллеливания не происходит.

Поэтому используются два подхода.

  • Первый - сложные операции выносятся в отдельные потоки. Если говорить о Python, то при этом возникают дополнительные сложности, связанные с Global Interpreter Lock, но они преодолимы.

    Разумеется, доступ к общим переменным и сетевые операции (они не thread-safe) потребуют дополнительной синхронизации.

  • Второй - запускается несколько экземпляров сервера, и нагрузка балансируется между ними. В этом случае данные будут уже не в едином адресном пространстве.

    Но если у вас такое мощное приложение - возможно, это решение будет правильным, т.к. оно более масштабируемо: разные процессы проще будет разнести по разным машинам, чем потоки, использующие общее адресное пространство.

Сервер Node.JS написан на Javascript и использует быструю библиотеку V8 для интерпретации Javascript-кода и libev для обработки событий. Он работает аналогичным образом. Все соединения обрабатываются единственным процессом/потоком.

Вот код такого же по функционалу COMET-сервера на Node.JS:

sys = require('sys'),
http = require('http');
url = require('url')

clientManager = new function() {
    var clients = []

    this.registerClient = function(client) {
      clients.push(client)
    }

    this.broadcastMessage = function(message) {
      for(var i=0; i<clients.length; i++) {
        var client = clients[i]
        client.writeHeader(200, {'Content-Type': 'text/plain;charset=utf-8'})
        client.write(message.toString(), 'utf-8')
        client.close()
      }
      clients = []
    }
}

http.createServer(function (req, res) {
  var urlParsed = url.parse(req.url, true)

  if (urlParsed.pathname == '/publish') {
    clientManager.broadcastMessage(urlParsed.query.message)
    res.writeHeader(200, {'Content-Type': 'text/plain;charset=utf-8'})
    res.write('ok', 'utf-8')
    res.close()
  }

  if (urlParsed.pathname == '/subscribe') {
    clientManager.registerClient(res)
  }

}).listen(8000);

Как видно, примеры на Node.JS и Twisted очень похожи. Одинаковой функциональностью обладает clientManager. Различие только в синтаксисе.

Конечно Node.JS является гораздо более молодым фреймворком и функционально далек от такого гиганта как Twisted, но он легче, написан на новых библиотеках и вполне юзабелен для несложных приложений.

Этот паттерн прибрел широкую известность благодаря веб-серверу Jetty, написанному на Java.

Он заключается в том, что запрос посетителя "упаковывается" в специальный объект Continuation, который передается обработчику асинхронного события.

Когда событие происходит, обработчик либо выдает ответ самостоятельно, либо вызывает метод continuation.resume(), который заставляет сервер обработать запрос заново, но при этом объект continuation будет содержать данные события.

Вот так это выглядит:

public class SubscribeServlet extends HttpServlet {
    
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        Continuation continuation = ContinuationSupport.getContinuation(req);

        if (continuation.isInitial()) {
            continuation.suspend();
            ClientManager.getInstance().registerClient(continuation);
        } else {
            String message = (String)continuation.getAttribute("message");
            resp.setContentType("text/plain;charset=utf-8");
            resp.getWriter().print(message);            
        }

    }
}

Рассмотрим происходящее в деталях:

  1. Пришел запрос в SubscribeServlet
  2. Запрос оборачивается в объект continuation.
  3. При создании continuation.isInitial() возвращает true, поэтому будет исполнен код:
    continuation.suspend();
    ClientManager.getInstance().registerClient(continuation);
    

    Первая строка говорит серверу, что объект continuation необходимо оставить в памяти после завершения запроса и не закрывать соединение, а вторая - регистрирует его среди ожидающих клиентов.

  4. На этом поток заканчивает обработку запроса и может быть использован для обработки новых запросов.

В дальнейшем приходит сообщение на /publish. Такие запросы попадают в PublishServlet:

public class PublishServlet extends HttpServlet {
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        ClientManager.getInstance().broadcastMessage(req.getParameter("message"));
        resp.setContentType("text/plain;charset=utf-8");
        resp.getWriter().print("ok");
    }
}

Этот сервлет вызывает метод ClientManager#broadcastMessage, в котором и происходит все интересное:

public class ClientManager {
    ArrayList<Continuation> continuations = new ArrayList<Continuation>();

    public synchronized void registerClient(Continuation continuation) {
        continuations.add(continuation);
    }

    public synchronized void broadcastMessage(String message) {
        for(Continuation continuation: continuations) {
            continuation.setAttribute("message", message);
            continuation.resume();
        }
        continuations.clear();
    }

    // синглтон

    private static ClientManager ourInstance = new ClientManager();

    public static ClientManager getInstance() {
        return ourInstance;
    }

    private ClientManager() {   }

}

Как видно из последних строк кода, этот объект является синглтоном.

Список ожидающих запросов, обернутых в объект типа Continuation, хранится в свойстве continuations. Это свойство является обычным ArrayList, операции с которым не thread-safe.

При вызове broadcastMessage во все continuations записывается поступившее сообщение (туда можно записать любой объект) и производится вызов continuation.resume().

При таком вызове сервер запускает обработку запроса, на основе которого создан continuation, заново. При этом continuation.isInitial() уже вернет false, т.к. запрос не является изначальным.

В SubscribeServlet это приведет к выполнению нижней ветки кода:

String message = (String)continuation.getAttribute("message");
resp.setContentType("text/plain;charset=utf-8");
resp.getWriter().print(message);

Здесь мы не вызываем continuation.suspend(), поэтому соединение будет закрыто.

Обратим внимание, что методы синглтона clientManager являютя критическими, т.е. помечены как synchronized. Это обязательно, иначе могут возникнуть ошибки одновременного доступа.

Альтернативная модель использования Continuations заключается в генерации ответа непосредственно при обработке события, без пробрасывания запроса в новый цикл обработки.

В примере выше это означало бы, что вместо вызова continuation.resume() метод broadcastMessage сгенерировал бы ответ, отправил его и закрыл continuation.

Это кажется более простым вариантом, но вспомним, что клиенты обрабатываются в цикле:

for(Continuation continuation: continuations) {
            continuation.setAttribute("message", message);
            continuation.resume();
        }

Если вместо continuation.resume(); вставить сколько-нибудь сложную генерацию ответа, то придется самостоятельно заботиться о том, чтобы она была асинхронной, чтобы цикл не обрабатывал клиентов по очереди - а их может быть много.

То есть, придется самостоятельно работать с потоками и т.п.
С другой стороны, для генерации простого ответа подойдет и такой способ: в нем отсутствуют излишние затраты на повторный вход в поток обработки запроса.

Обе модели также реализованы в сервере GlassFish и учтены в спецификации Servlet API 3.0.

При описанном подходе в основе всего лежит ThreadPool. Потоки запускаются, обрабатывают запрос и, по окончании обработки, возвращаются в пул и могут быть использованы заново.

При этом соединение инкапсулируется в объекте типа Сontinuation и хранится в памяти. Вызов этого объекта позволяет продолжить работу с отложенным соединением в любой момент.

Использование потоков естественным образом приводит к загрузке всех доступных ядер/CPU.

Предыдущие методы (асинхронная обработка и continuations) объединяет общий подход к задаче обработки множества соединений.

Они сохраняют состояние в памяти и освобождают поток для дальнейшей работы.

"Микронити" - это решение с совершенно другой стороны. Вместо того, чтобы освобождать поток, он делается максимально легким, чтобы можно было поддерживать 100, 1000, 10.000 и более потоков без существенной загрузки ресурсов.

Это достигается в первую очередь благодаря отказу от потоков OS и от использования стека операционной системы. Вместо этого виртуальная машина интерпретатора поддерживает собственный стек и механизмы переключения между контекстами.

Одним из самых известных языков, основанных на микронитях, является Erlang.

Он является функциональным языком программирования со специфическим синтаксисом и множеством оригинальных возможностей, включая встроенную поддержку кластеризации, базы данных, горячую замену кода без остановки обслуживания и другие.

Для реализации COMET-сервера я использовал фреймворк MochiWeb.

Вот как он выглядит:

room(Users) ->
    receive
        {From, subscribe} ->
            From ! subscribed,
            room([From | Users]);
        {From, post, Message} ->
            From ! posted,
            lists:foreach(fun(User) ->
                    % broadcast the message
                    User ! Message
                end, Users),
            room([]);
        _Any ->
            room(Users)
    end.

get_the_room() ->
    % does the room exists?
    Pid = whereis(theroom),
    if
        is_pid(Pid) ->
            % yup
            Pid;
        true ->
            % create it
            NewPid = spawn(fun() ->
                room([])
            end),
            register(theroom, NewPid),
            NewPid
    end.


loop(Req, DocRoot) ->
    "/" ++ Path = Req:get(path),
    case Req:get(method) of
        Method when Method =:= 'GET'; Method =:= 'HEAD' ->
            case Path of
                "subscribe" ->
                    Room = get_the_room(),
                    Room ! {self(), subscribe},
                    receive
                        subscribed ->
                            % subscription is ok
                            % now wait for a message
                            receive
                                Message ->
                                    ok
                            after ?TIMEOUT ->
                                % we waited too long
                                Message} = <<"timeout">>
                            end
                    after 50000 ->
                        % subscription failed on time
                        Message = <<"timeout">>
                    end,

                    % send back the JSON message
                    Req:ok({"text/plain;charset=utf-8", Message});
                end;
        'POST' ->
            case Path of
                "publish" ->
                    Data = Req:parse_post(),
                    Room = get_the_room(),
                    % post
                    Room ! {self(), post, list_to_binary(proplists:get_value("message", Data))},
                    receive
                        posted ->
                            % posted
                            ok
                    after 50000 ->
                        % something went wrong
                        notok
                    end,

                    % send back the message
                    Req:ok({"text/plain", "ok"});
                _ ->
                    Req:not_found()
            end;
        _ ->
            Req:respond({501, [], []})
    end.

На тот случай, если Эрланг для вас внове - позвольте объяснить происходящее.

Отличительной особенностью языка является модель легковесных процессов. Вот почему язык стимулирует создание большого количества параллельных процессов. Они изолированы друг от друга и не имеют общего состояния.

Между процессами можно установить связь и получить сообщение об их состоянии. Для взаимодействия процессов используется асинхронный обмен сообщениями. Каждый процесс имеет свою очередь сообщений.

Каждая функция в Erlang производит вычисления выражений, разделенных запятыми и возвращает последнее вычисленное.

Итак, рассмотрим код выше по частям.

Все подключения хранятся в процессе с именем theroom. Это аналог рассмотренного выше clientManager, но вместо синглтона используется отдельный процесс.

Метод get_the_room пытается получить Pid процесса с именем theroom и, если такого процесса нет, генерирует его вызовом встроенного метода spawn.

get_the_room() ->
    % does the room exists?
    Pid = whereis(theroom),
    if
        is_pid(Pid) ->
            % yup
            Pid;
        true ->
            % create it
            NewPid = spawn(fun() ->
                room([])
            end),
            register(theroom, NewPid),
            NewPid
    end.

Единственная задача этого метода - запуск процесса theroom. При запуске процесса ему передается для выполнения функция room.

Первый аргумент room - текущий список подключенных посетителей. Изначально при вызове spawn этот список пустой.

room(Users) ->
    *!*receive*/!*
        {From, subscribe} ->
            From ! subscribed,
            room([From | Users]);
        {From, post, Message} ->
            From ! posted,
            lists:foreach(fun(User) ->
                    % broadcast the message
                    User ! Message
                end, Users),
            room([]);
        _Any ->
            room(Users)
    end.

Директива receive заставляет процесс висеть, ожидая сообщение.

При получении сообщения subscribe от процесса From, она отправляет в ответ сообщение subscribed:

% этот синтаксис означает отправку сообщения subscribed 
% процессу с идентификатором From
From ! subscribed,

Сообщение отправляется асинхронно, т.е. функция продолжает работу, вызывая room([From | Users]) - то есть саму себя, но со списком, в который добавлен процесс From (операция | означает добавление элемента в список).

Этот рекурсивный запуск приведет к новому вызову receive и так далее. Обычное функционирование процесса, только вместо цикла - рекурсивный вызов.

Ветка {From, post, Message} активируется при получении сообщения post и рассылает сообщение всем процессам из текущего списка.

From ! posted,
lists:foreach(fun(User) ->
    % Отослать сообщение Message процессу User
     User ! Message
end, Users),

Как видите, каждому посетителю соответствует отдельный процесс.

Проверка, какое сообщение прислано, осуществляется путем сравнения с паттерном, в котором слова с заглавной буквы обозначают переменные, а со строчной - константы.

В примере выше это означает, что сообщение, состоящее из двух символов (Что-нибудь, subscribe) пойдет в первую ветку, из трех символов (Что-нибудь, post, Что-нибудь) - во вторую, а не подходящее под оба этих паттерны - пойдет в ветку _Any.

После рассылки сообщений рекурсивный вызов room([]) происходит с пустым списком, и процесс продолжает работу.

Эта функция обрабатывает входящие запросы. Как легко видеть из кода, она разбрасывает их по веткам subscribe и publish.

Ветка subscribe:

% получить идентификатор процесса комнаты
Room = get_the_room(),
% отослать сообщение subscribe (асинхронно)
Room ! {self(), subscribe},
% подвиснуть в ожидании ответа от комнаты
receive
    subscribed ->
        % подписка удалась, подождать сообщения
        receive
            Message ->
                ok
        % таймаут ожидания
        after ?TIMEOUT ->
            % слишком долго ждали сообщения
            Message} = <<"timeout">>
        end
after 50000 ->
    % не сигнала об успешной подписке
    Message = <<"timeout">>
end,
% отправить ответ или ошибку посетителю
Req:ok({"text/plain;charset=utf-8", Message});

Ветка publish работает аналогичным образом, разве что попроще.

Благодаря использованию легких процессов, 100, 1000 и даже 100.000 процессов легко обрабатываются виртуальной машиной Erlang/OTP.

Концепция микропотоков никак не противоречит Continuations.
Процессы эрланга могут быть заморожены (hibernate), при этом максимально очищается связанная с процессом память и освобождаются ресурсы, а затем разморожены при получении сообщения.

Это особенно удобно, когда процессы висят подолгу, ожидая сообщение.

С этим связаны некоторые сложности в программировании, т.к. hibernate, кроме всего прочего, очищает стек, но они вполне преодолимы.

Как вы, наверное, уже догадались(или знали), Erlang позволяет создавать ну очень производительные, масштабируемые и надежные приложения.

Для работы с большим количеством соединений необходимы некоторые настройки операционной системы.

Для Linux это можно сделать следующими строчками в /etc/sysctl.conf:

# General gigabit tuning:
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.ipv4.tcp_syncookies = 1

# this gives the kernel more memory for tcp
# which you need with many (100k+) open socket connections
net.ipv4.tcp_mem = 50576 64768 98152
net.core.netdev_max_backlog = 2500
net.ipv4.tcp_keepalive_time = 300
net.core.somaxconn = 4096

Плюс - поднять лимит на количество открытых файлов. Например, только для root: (/etc/security/limits.conf)

# ....
*               soft    nofile          1024
*               hard    nofile          2048
root            soft    nofile          32768
root            hard    nofile          32768

Исходники всех серверов, упомянутых в данной статье, вы можете скачать.

Ряд серверов предлагает дополнительные обертки и фреймворки, упрощающие COMET, включая javascript-библиотеки и различные режимы работы: polling / streaming / long-poll.

Вот список самых распространенных открытых серверов:

Java
Javascript
Python
Erlang
Ruby

Java - сервера используют Continuations, Python/Ruby/Javascript - события в едином потоке, Erlang-реализация основана на микронитях с возможностью hibernate.

Соответственно, в плюсе у Java-серверов - естественная масштабируемость на множество потоков и использование множества CPU, у Python/Ruby/Javascript - асинхронное программирование в едином потоке без синхронизаций, Erlang обеспечивает кластеризацию и отказоустойчивость на множестве серверов, очень хорошие характеристики по производительности и затратам памяти.

Вам выбирать, что подойдет для ваших приложений.


Автор: aleks.raiden (не зарегистрирован), дата: 12 марта, 2010 - 19:23
#permalink

Где APE, Dklab_Realplexor?


Автор: Илья Кантор, дата: 13 марта, 2010 - 02:27
#permalink

Не знаю проектов серьезных, которые ими пользуются. Знаете - напишите...


Автор: aleks.raiden (не зарегистрирован), дата: 13 марта, 2010 - 12:14
#permalink

Rutwit тот же. Так вам серьезностью мерятся (кстати, какая единица измерения) или показать современные решения в этой области?
Кстати, а кто из серьезных использует Node.js (да, знаю, Plurk перешел).
Тогда уж укажите для каждого из проектов кто и где их использует, если по этому критерию был отбор.

P.S. Это никак не умаляет ценности и интересности всей статьи, конечно же.
P.P.S. Кстати, для Java есть новый активно развивающийся Comet-фреймворк Atmosphere (https://atmosphere.dev.java.net/)


Автор: Илья Кантор, дата: 13 марта, 2010 - 21:35
#permalink

В настоящий момент есть очень много COMET-серверов на разных языках и платформах. Все их перечислить невозможно, на и бессмысленно - гугл все равно актуальнее.

В статье упомянуты те, о которых слышал от знакомых (применяющих их) или сам имел с ними дело и знаю что оно работает ок. Это единственный мой критерий.

А самое главное, в чем центр внимания статьи - это модель (паттерн) COMET, по которой работает сервер. Это ядро, и сколько бы ни было серверов - это лишь разные синтаксические надстройки над одним из описанных в статье паттернов.

С уважением,
Илья Кантор


Автор: Александр второй (не зарегистрирован), дата: 30 мая, 2010 - 09:31
#permalink

А где можно почитать про редирект на Java?


Автор: Valery (не зарегистрирован), дата: 31 августа, 2010 - 16:54
#permalink

Вопрос об использовании framework`ов.

На сколько я понимаю все эти сервера используют базовые принципы которые были описаны в одной из статей. К примеру указана что orbited использует как pooling, long-pooling, Callback-Polling, Iframe Streaming, HTML5 WebSocket и тд (http://cometdaily.com/maturity.html) Что реально под этим фреймворком? Каким образом он использует все эти подходы?

В документации к APE вообще только вскользь упоминается о long pooling, при этом они обещают с легкостью поддерживать 100k соединений.

Есть ли смысл писать собственную реализацию comet или проще использовать готовый фреймворк?


Автор: Илья Кантор, дата: 31 августа, 2010 - 20:28
#permalink

Имеет смысл использовать готовый фреймворк, написанный на том же языке, на котором основной сайт. Это позволит вам с легкостью интегрировать модель, авторизацию и т.п.

С другой стороны, long polling несложен в реализации... Но опять же, нужна платформа для большого кол-ва соединений.


Автор: pluseg, дата: 25 октября, 2010 - 00:36
#permalink

А можете подсказать PHP COMET-сервер? А то что-то обошли его [PHP] стороной, даже странно.


Автор: Илья Кантор, дата: 25 октября, 2010 - 07:10
#permalink

Можно использовать phpDaemon для этой цели.


Автор: Марк (не зарегистрирован), дата: 7 марта, 2011 - 19:49
#permalink

Здравствуйте, Илья!

У меня вот какая проблема. Начал строить свой велосипед COMET сервер на Node.js, и столкнулся с интересным эффектом. Если послать два запроса subscribe и потом один publish, то броадкаст распространится только на первый subscribe. И только после этого висящий клиент из второго "subscribe"-запроса запишется в массив клиентов. Это не то поведение, которое я ожидал.

Более того, я решил проверить не сошел ли я с ума, и попробовал ваш пример из этой статьи. Результат был таким же: открываем в двух окнах http://127.0.0.1:8000/subscribe - оба правильно виснут. Потом в другом окне открываем http://127.0.0.1:8000/publish?message=foo. В это окно возвращается "ok", но лишь в одно из двух висящих окон подписчиков вываливается сообщение foo. Если снова послать "publish", то сообщение вывалится во второе окно. И т.д. сколько бы подписчиков не было. Такое впечатление, что запросы складываются в какую-то странную очередь, однако непонятно как таким образом может выполниться publish...

Может дело в версии node.js?

У меня (оказывается) 0.2.0.


Автор: Марк (не зарегистрирован), дата: 7 марта, 2011 - 20:44
#permalink

К моему предыдущему комментарию (если его опубликуют).

Проблема решена, и, думаю, другим это может тоже оказаться полезным. Нельзя подобные вещи тестировать на нескольких вкладках одного браузера. Например, Chrome некорректно отрабатывает подобный эксперимент, и посылает второй запрос subscribe после того, как придет ответ на первый publish.

Такие вещи нужно тестить простыми браузерами, например links.


Автор: Гость (не зарегистрирован), дата: 7 марта, 2011 - 20:48
#permalink

В общем-то мой предыдущий комент не опубликовали. Он состоял в том, что пример на node.js некорерктно работает (ответ приходит не на все висящие клиенты, а только на один, после чего остальные клиенты ставятся в очередь). но, как оказалось, это особенности браузера. Всем спасибо, и извините за путанность изложения.


 
Текущий раздел
Поиск по сайту
Содержание

Учебник javascript

Основные элементы языка

Сундучок с инструментами

Интерфейсы

Все об AJAX

Оптимизация

Разное

Дерево всех статей

Последние темы на форуме
Forum