Geventのsocketコラボレーションセキュリティ分析

5477 ワード

一般的にsocketの同時セキュリティについて議論するのは、スレッドのセキュリティを指す...ほとんどの場合、socketはスレッドで安全ではありません.
もちろん、いくつかのフレームワークはsocketをカプセル化し、スレッドが安全になる可能性があります.の例えばjavaのnettyフレームワークは、socketをchannelにカプセル化し、channelを1つのスレッドに閉じ込めると、このchannelのすべての読み書きがその存在するスレッドの中でシリアルに行われ、自然にスレッドが安全になります...
実は早くGeventのソースコードを见た时、すでにこの部分のものを见たことがあって、その时すでにgeventのsocketが协程安全ではないことを知っていて、つまりgevnetのsocketは异なる协程の中で同时に読んだり书いたりすることができません..
例えば、2つのコラボレーションでsocketを同時に呼び出すことはできません.recvメソッド...
でも、自分が今忘れているようなので、もう一度見て、ついでにブログを書いて記録して、後で忘れないようにして、資料が見つからないようにします.
ではなぜですか...ソースコードを分析しましょう.ここではsendメソッドを分析し、geventのsendメソッドの定義を見てみましょう.
    #                   ,           
    #           
    def send(self, data, flags=0, timeout=timeout_default):
        sock = self._sock
        if timeout is timeout_default:
            timeout = self.timeout
        try:
            return sock.send(data, flags)
        except error:
            #EWOULDBLOCK           ,      socket,             
            #                write_event  ,      
            ex = sys.exc_info()[1]
            if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
                raise
            sys.exc_clear()
            self._wait(self._write_event)
            try:
                return sock.send(data, flags)
            except error:
                ex2 = sys.exc_info()[1]
                if ex2.args[0] == EWOULDBLOCK:
                    return 0
                raise

つまり、現在逆送信が行われていない場合は、_が呼び出されます.waitメソッドで待つ_write_イベントイベント
    #     watcher,   read  write  ,        
    def _wait(self, watcher, timeout_exc=timeout('timed out')):
        """Block the current greenlet until *watcher* has pending events.

        If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
        By default *timeout_exc* is ``socket.timeout('timed out')``.

        If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
        """
        assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
        if self.timeout is not None: #     ,   timeout,     ,    timeout
            #       ,              
            timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
        else:
            timeout = None
        try:
            self.hub.wait(watcher)  # hub      watcher,         hub   ,    watcher         
        finally:
            if timeout is not None:
                timeout.cancel()

実はここは簡単です.hubの上で現在のsocketの書き込みイベントを待っているwatcherです.
    #    gevent.sleep              ,        
    #watcher    loop       ,    ,     
    #   loop    watcher,           
    def wait(self, watcher):
        waiter = Waiter() #      waiter  
        unique = object() 
        watcher.start(waiter.switch, unique) # watcher         waiter switch                
        try:
            result = waiter.get() #  waiter get  ,         sleep greenlet    ,     hub   
            assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
        finally:
            watcher.stop()

これは簡単でしょう.のwaiterイベントを作成し、watcherオブジェクトのstartメソッドを呼び出します.コールバックメソッドは、現在のwaiterオブジェクトのswitchメソッドに設定され、getメソッドを呼び出し、現在のコヒーレンスを切り替えます.の
ではIOタイプのwatcherのstartメソッドに来てみましょう.
#I/Owatcher   
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:

    WATCHER_BASE(io)  #               ,  libev  watcher    

    #          watcher
    #   callback       waiter   switch  ,  ,  IO               
    def start(self, object callback, *args, pass_events=False):
        CHECK_LOOP2(self.loop)
        if callback is None:
            raise TypeError('callback must be callable, not None')
        self.callback = callback
        if pass_events:
            self.args = (GEVENT_CORE_EVENTS, ) + args
        else:
            self.args = args
        LIBEV_UNREF
        libev.ev_io_start(self.loop._ptr, &self._watcher)  # libev loop      io watcher
        PYTHON_INCREF

    ACTIVE

ええと、これはcythonのコードなので、ちょっと違和感があります...では次にlibevのevを見てみましょうio_start関数の実装:
void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  int fd = w->fd;

  if (expect_false (ev_is_active (w)))
    return;

  assert (("libev: ev_io_start called with negative fd", fd >= 0));
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));

  EV_FREQUENT_CHECK;

  ev_start (EV_A_ (W)w, 1);   //       active
  //     anfds           fd,      ,        
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  //   watcher    fd wather     
  wlist_add (&anfds[fd].head, (WL)w);

  /* common bug, apparently */
  assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);   //  fd         ,         loop   
  w->events &= ~EV__IOFDSET;

  EV_FREQUENT_CHECK;
}

ここでは、主に現在のwatcherオブジェクトをアクティブにし、このwatcherオブジェクトを現在のファイル記述子のwatcherチェーンテーブルのヘッダに配置します.のうん.つまりwlist_addメソッドでやるべきこと...実はここを見てsocketはきっと協程安全ではないことを知っています...
うん、皆さんもきっとわかったと思います...わからない話.コードをもう一度見ればわかります....