写在最前面

该实现已经发布 可以通过一下方式下载:

go get gitee.com/xqhero/fastcgi

定义常量 和 统一的变量

// 定义最大的内容长度和padding长度
const (
    MAXCONTENT     = 65535
    MAXPAD        = 255
)
// 自定义类型
type rctype uint8
// 消息头类型定义
const (
    FCGI_BEGIN_REQUEST         rctype    =      1
    FCGI_ABORT_REQUEST      rctype  =      2
    FCGI_END_REQUEST        rctype    =    3
    FCGI_PARAMS                rctype    =    4
    FCGI_STDIN                rctype    =    5
    FCGI_STDOUT                rctype    =    6
    FCGI_STDERR                rctype    =    7
    FCGI_DATA                rctype    =     8
    FCGI_GET_VALUES            rctype    =    9
    FCGI_GET_VALUES_RESULT    rctype    =     10
    FCGI_UNKNOWN_TYPE        rctype    =    11
)

// 角色
const (
    ROLE_FCGI_RESPONSE      = iota + 1  // 响应器
    ROLE_FCGI_AUTHORIZER                // 授权器
    ROLE_FCGI_FILTER                    // 过滤器
)

// 请求完成描述
const (
    FCGI_REQUEST_COMPLETE = iota
    FCGI_CANT_MPX_CONN
    FCGI_OVERLOADED
    FCGI_UNKNOW_ROLE
)

var pad [MAXPAD]byte   // 用于padding时读取

定义基本结构及方法

消息头

// 消息头结构体,8字节
type FCGI_Header struct {
    Version         uint8    // 版本
    Ftype             rctype  // 类型
    RequestId        uint16  // 请求ID, 两个字节 对应 RequestId1 和 RequestId0
    ContentLength     uint16  // 内容长度,两个字节 对应 ContentLengthB1 和 ContentLengthB0
    PaddingLength     uint8    // 填充字节的长度
    Reserved        uint8    // 保留字节
}
// 初始化消息头
func (h *FCGI_Header) init(ftype rctype, reqId uint16, contentLen uint16) {
    h.Version = 1
    h.Ftype = ftype
    h.RequestId = reqId
    h.ContentLength = contentLen
    h.PaddingLength = uint8( -contentLen  & 7 )
}

**关于h.PaddingLength = uint8( -contentLen & 7 ) 的计算说明:

针对 a%b 且 b为2^n,则可以使用 a & (b-1) 来代替求值

请求开始结构体

// ftype=1 消息体
type FCGI_BeginRequestBody struct {
    Role         uint16         // 角色,2字节 对应 RoleB1 RoleB0
    Flags         uint8        // 是否保持连接标记
    Reserved     [5]uint8    // 保留字段
}
// 从content中读取指定内容初始化beginrequestbody struct
func (rb *FCGI_BeginRequestBody) read(content []byte) error {
    if len(content) != 8 {
        return errors.New("fcgi: invalid begin request record")
    }
    rb.Role = binary.BigEndian.Uint16(content)
    rb.Flags = content[2]
    return nil
}

record 结构体

// record
type Record struct {
    h FCGI_Header   // 消息头
    buf [MAXCONTENT + MAXPAD]byte  // 消息体数组
}

// 读取一条record
func (r *Record) read(reader io.Reader) (err error)  {
    // 通过大端法读取数据到header中
    if err = binary.Read(reader, binary.BigEndian, &r.h); err != nil {
        return err
    }
    // 判断版本
    if r.h.Version != 1 {
        return errors.New("invalid header version")
    }
    // 计算消息内容长度
    n := int(r.h.ContentLength) + int(r.h.PaddingLength)
    // 读取内容到buf中
    if _, err := io.ReadFull(reader, r.buf[:n]); err != nil {
        return err
    }
    return nil
}

// 获取一条记录的内容
func (r *Record) content() []byte {
    return r.buf[:r.h.ContentLength]
}

Server 定义及方法

// 定义FcgiServer结构
type FcgiServer struct {
    addr         string   // 地址
    listener     net.Listener
    handler        http.Handler
}
// 创建
func NewFcgiServer(addr string, handler http.Handler) *FcgiServer  {
    srv := &FcgiServer{
        addr: addr,
        listener: nil,
        handler: handler,
    }
    return srv
}

// 启动
func (srv *FcgiServer) Start() error {
    listener, err := net.Listen("tcp", srv.addr)
    if err != nil {
        return err
    }
    srv.listener = listener
    if srv.handler == nil {   // 如果没有定义handler则默认采用http.DefaultServerMux
        srv.handler = http.DefaultServeMux
    }
    for {
        conn, err := srv.listener.Accept()
        if err != nil {
            return err
        }
        c := NewConnChild(conn, srv.handler)
        go c.Serve()
    }
    return nil
}

ConnChild 结构与方法

// connChild 结构   此处可以加入上下文
type ConnChild struct {
    conn *Conn   // 连接套接字
    handler http.Handler  // 处理的handler
    mutex sync.RWMutex   
    requests map[uint16]*Request  // 保存请求, 如果是复用连接则有多个
}

func NewConnChild(conn io.ReadWriteCloser,handler http.Handler) *ConnChild  {
    c := &ConnChild{
        conn: newConn(conn),
        handler: handler,
        requests: make(map[uint16]*Request),
    }
    return c
}

Request 结构

// Request 结构体
type Request struct {
    pipeWriter   *io.PipeWriter        // 用于后续单独goroutine处理stdin
    reqId         uint16                        // 请求序号
    params         map[string]string //  存储解析出来的params参数
    buf         [1024]byte            
    rawParams     []byte                    // 在没有解析前存储原始的params内容
    keepAlive      bool                    // 是否保持长连接
}

// 构造新请求
func NewRequest(reqId uint16,flags uint8) *Request {
    r := &Request{
        reqId: reqId,
        params: map[string]string{},
        keepAlive: flags & flagKeepAlive != 0,
    }
    r.rawParams = r.buf[:0]  // 好处是不用在额外分配空间
    return r
}

// 解析key-value
// 0x220x33namea
// 0x220x33namea
func (req *Request) ParseParams() {
    text := req.rawParams
    req.rawParams = nil
    for len(text) > 0 {
        // 得到key长度
        keylen, n := readSize(text)
        if n == 0 {
            fmt.Println("n===0")
            return
        }
        text = text[n:]
        // 得到值长度
        valuelen, n := readSize(text)
        if n == 0 {
            return
        }
        text = text[n:]
        if keylen + valuelen > uint32(len(text)) {
            return
        }
        key := string(text[:keylen])
        text = text[keylen:]
        value := string(text[:valuelen])
        text = text[valuelen:]
        // 加入key-value pair
        req.params[key] = value
    }
}

Conn 结构

type Conn struct {
    mutex sync.Mutex           // 关闭时需要特别注意
    rwc io.ReadWriteCloser   // 具体的连接
    buf bytes.Buffer  // 输出buf,方便复用
    h FCGI_Header   //  消息头,方便复用
}

func newConn(rwc io.ReadWriteCloser) *Conn {
    return &Conn{rwc: rwc}
}

func (c *Conn) Close() error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    return c.rwc.Close()
}

// 写record
func (c *Conn) writeRecord(rtype rctype, reqId uint16, bytes []byte) error {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    c.buf.Reset()   // buf重置
    c.h.init(rtype,reqId,uint16(len(bytes)))
    if err := binary.Write(&c.buf, binary.BigEndian, c.h); err != nil {
        return err
    }
    if _ , err := c.buf.Write(bytes); err != nil {
        return err
    }
    if _ , err := c.buf.Write(pad[:c.h.PaddingLength]); err != nil {
        return err
    }
    // 发送消息体
    _ , err := c.rwc.Write(c.buf.Bytes())
    return err
}

// 发送 fcgi_end_request
func (c *Conn) withEndRequest(reqId uint16, appStatus uint32, protocolStatus uint8) error {
    b := make([]byte, 8)
    binary.BigEndian.PutUint32(b,appStatus)
    b[4] = protocolStatus
    return c.writeRecord(FCGI_END_REQUEST, reqId, b )
}

ConnChild 方法

EventLoop

func (c *ConnChild) Serve() {
    // 退出时关闭连接
    defer c.conn.Close()
    // 清理工作
    defer c.cleanUp()
    var rec Record
    for {
        if err := rec.read(c.conn.rwc); err != nil {
            fmt.Println("读取失败")
            return
        }
        if err := c.handleRecord(&rec); err != nil {
            fmt.Println("操作失败")
            return
        }
    }
}

// 退出时清理工作,要判断每个request上的pipewriter是否存在 如果存在则关闭
func (c *ConnChild) cleanUp() {
    c.mutex.Lock()
    defer c.mutex.Unlock()
    for _, req := range c.requests {
        if req.pipeWriter != nil {
            req.pipeWriter.CloseWithError(ErrConnClosed)
        }
    }
}

handlerRecord 处理数据包 核心

func (c *ConnChild) handleRecord(record *Record) error {
    // 读取是否有对应requestId的包
    c.mutex.RLock()
    request, ok := c.requests[record.h.RequestId]
    c.mutex.RUnlock()
    // 不存在且类型不为FCGI_BEGIN_REQUEST且类型不为FCGI_GET_VALUES 则不处理
    if !ok && record.h.Ftype != FCGI_BEGIN_REQUEST && record.h.Ftype != FCGI_GET_VALUES {
        return nil
    }
    switch record.h.Ftype {
    case FCGI_BEGIN_REQUEST:
        // 如果使用同样的序号则返回错误
        if request != nil {
            return errors.New("fcgi: received ID that is already in-flight")
        }

        var br FCGI_BeginRequestBody
        // 将内容赋值给br
        err := br.read(record.content())
        if err != nil {
            return err
        }
        // 只处理响应类型
        if br.Role != ROLE_FCGI_RESPONSE {
            // 直接返回结束
            c.conn.withEndRequest(record.h.RequestId, 0, FCGI_UNKNOW_ROLE)
            return nil
        }
        // 创建新的请求
        request = NewRequest(record.h.RequestId, br.Flags)
        c.mutex.Lock()
        c.requests[record.h.RequestId] = request
        c.mutex.Unlock()
    case FCGI_PARAMS:
        // 等所有的params都读取完毕才进行解析
        if len(record.content()) > 0 {
            request.rawParams = append(request.rawParams, record.content()...)
            return nil
        }
        // 将所有的key-value pair 进行解析
        request.ParseParams()

    case FCGI_STDIN:
        content := record.content()
        // 将其进行转换 并直接处理成http响应
        if request.pipeWriter == nil {
            var pipeReader io.ReadCloser
            if len(content) > 0 {
                pipeReader, request.pipeWriter = io.Pipe()
                fmt.Println(pipeReader)
            } else {
                pipeReader = emptyBody
            }
            // 开启线程处理请求 借助pipe的特性
            go c.serveRequest(request, pipeReader)
        }
        // 如果内容长度大于0 则将内容写入到pipe
        if (len(content) > 0 ){
            request.pipeWriter.Write(content)
        } else if request.pipeWriter != nil{
            // 如果收到空的stdin包 则关闭pipe写端
            request.pipeWriter.Close()
        }
    default:
        b := make([]byte, 8)
        b[0] = byte(record.h.Ftype)
        c.conn.writeRecord(FCGI_UNKNOWN_TYPE, 0, b)
    }
    return nil
}

serveRequest 开启线程进行单独的数据处理 核心

func (c *ConnChild) serveRequest( req *Request, pr io.ReadCloser) {
    // 将fastcgi交给http处理
    resp := NewResponse(c, req)
    httpReq, err := RequestFromMap(req.params)
    if err != nil {
        resp.WriteHeader(http.StatusInternalServerError)
        c.conn.writeRecord(FCGI_STDERR, req.reqId, []byte(err.Error()))
    } else {
        httpReq.Body = pr
        c.handler.ServeHTTP(resp, httpReq)
    }
    // 将处理的结果返回给fastcgi客户端
    resp.Write([]byte(""))
    resp.Close()
    c.mutex.Lock()
    delete(c.requests, req.reqId)
    c.mutex.Unlock()
    c.conn.withEndRequest(req.reqId, 0, FCGI_REQUEST_COMPLETE)

    // 没有读取pr 则将pr内容进行清除
    // io.CopyN(ioutil.Discard, pr, 100<<20)
    pr.Close()
    if !req.keepAlive {
        c.conn.Close()
    }
}

// 从params参数初始化http Request结构
func RequestFromMap(params map[string]string) (*http.Request, error) {
    r := new(http.Request)
    r.Method = params["REQUEST_METHOD"]
    if r.Method == "" {
        return nil, errors.New("cgi: no REQUEST_METHOD in environment")
    }
    r.Proto = params["SERVER_PROTOCOL"]
    var ok bool
    r.ProtoMajor, r.ProtoMinor, ok = http.ParseHTTPVersion(r.Proto)
    if !ok {
        return nil, errors.New("cgi: invalid SERVER_PROTOCOL version")
    }
    r.Close = true
    r.Trailer = http.Header{}
    r.Header = http.Header{}

    r.Host = params["HTTP_HOST"]
    if lenstr := params["CONTENT_LENGTH"]; lenstr != "" {
        clen, err := strconv.ParseInt(lenstr, 10, 64)
        if err != nil {
            return nil, errors.New("cgi: bad CONTENT_LENGTH in environment: " + lenstr)
        }
        r.ContentLength = clen
    }
    if ct := params["CONTENT_TYPE"]; ct != "" {
        r.Header.Set("Content-Type", ct)
    }
    // Copy "HTTP_FOO_BAR" variables to "Foo-Bar" Headers
    for k, v := range params {
        if !strings.HasPrefix(k, "HTTP_") || k == "HTTP_HOST" {
            continue
        }
        r.Header.Add(strings.ReplaceAll(k[5:], "_", "-"), v)
    }

    uriStr := params["REQUEST_URI"]
    if uriStr == "" {
        // Fallback to SCRIPT_NAME, PATH_INFO and QUERY_STRING.
        uriStr = params["SCRIPT_NAME"] + params["PATH_INFO"]
        s := params["QUERY_STRING"]
        if s != "" {
            uriStr += "?" + s
        }
    }
    if s := params["HTTPS"]; s == "on" || s == "ON" || s == "1" {
        r.TLS = &tls.ConnectionState{HandshakeComplete: true}
    }
    if r.Host != "" {
        // Hostname is provided, so we can reasonably construct a URL.
        rawurl := r.Host + uriStr
        if r.TLS == nil {
            rawurl = "http://" + rawurl
        } else {
            rawurl = "https://" + rawurl
        }
        url, err := url.Parse(rawurl)
        if err != nil {
            return nil, errors.New("cgi: failed to parse host and REQUEST_URI into a URL: " + rawurl)
        }
        r.URL = url
    }
    if r.URL == nil {
        url, err := url.Parse(uriStr)
        if err != nil {
            return nil, errors.New("cgi: failed to parse REQUEST_URI into a URL: " + uriStr)
        }
        r.URL = url
    }
    remotePort, _ := strconv.Atoi(params["REMOTE_PORT"]) 
    r.RemoteAddr = net.JoinHostPort(params["REMOTE_ADDR"], strconv.Itoa(remotePort))
    return r, nil
}

Response

// 实现 ResponseWriter 接口
type Response struct {
    req             *Request
    header             http.Header
    code             int
    wroteHeader     bool
    wroteCGIHeader     bool
    w                  *bufWriter
}

func NewResponse(c *ConnChild, req *Request) *Response {
    res := &Response{
        req:     req,
        header: http.Header{},
        w:        newWriter(c.conn, FCGI_STDOUT, req.reqId),
    }
    return res
}

func (resp *Response) Header() http.Header {
    return resp.header
}

func (resp *Response) Write(p []byte) (int, error)  {
    if !resp.wroteHeader {   // 第一次写
        resp.WriteHeader(http.StatusOK)
    }
    if !resp.wroteCGIHeader {  // 第一次真正写
        //
        resp.writeCGIHeader(p)
    }
    return resp.w.Write(p)
}

func (resp *Response) WriteHeader(statusCode int) {
    if resp.wroteHeader {
        return
    }
    resp.wroteHeader = true
    resp.code = statusCode
    if statusCode == http.StatusNotModified {
        // Must not have body.
        resp.header.Del("Content-Type")
        resp.header.Del("Content-Length")
        resp.header.Del("Transfer-Encoding")
    }

    if resp.header.Get("Date") == "" {
        resp.header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
    }
}
// 真正写操作  生成http响应报文格式
func (r *Response) writeCGIHeader(p []byte) {
    if r.wroteCGIHeader {
        return
    }
    r.wroteCGIHeader = true
    fmt.Fprintf(r.w, "Status: %d %s\r\n", r.code, http.StatusText(r.code))
    if _, hasType := r.header["Content-Type"]; r.code != http.StatusNotModified && !hasType {
        r.header.Set("Content-Type", http.DetectContentType(p))
    }
    r.header.Write(r.w)
    r.w.WriteString("\r\n")
    r.w.Flush()
}

func (r *Response) Flush() {
    if !r.wroteHeader {
        r.WriteHeader(http.StatusOK)
    }
    r.w.Flush()
}

func (r *Response) Close() error {
    r.Flush()
    return r.w.Close()
}

bufWriter 与 response绑定的结构 主要是做包的转换

type bufWriter struct {
    closer io.Closer
    *bufio.Writer
}

func newWriter(c *Conn, recType rctype, reqId uint16) *bufWriter {
    s := &streamWriter{c: c, recType: recType, reqId: reqId}
    w := bufio.NewWriterSize(s, MAXCONTENT)  // 创建一个缓冲区写入
    return &bufWriter{s, w}
}

func (w *bufWriter) Close() error {
    if err := w.Writer.Flush(); err != nil {
        w.closer.Close()
        return err
    }
    return w.closer.Close()
}

// 发出响应包的结构
type streamWriter struct {
    c         *Conn
    recType rctype
    reqId    uint16
}

func (s *streamWriter) Write(b []byte) (int, error) {
    // 消息头 + 消息体  可分包处理
    nn := 0
    for len(b) > 0 {
        n := len(b)
        if n > MAXCONTENT {
            n = MAXCONTENT
        }
        if err := s.c.writeRecord(s.recType, s.reqId, b[:n]); err != nil {
            return nn, err
        }
        nn += n
        b = b[n:]
    }
    return nn, nil
}

func (s *streamWriter) Close() error {
    return s.c.writeRecord(s.recType,s.reqId,nil) // 写入结束标记
}

util 工具

// 根据条件读取长度值
func readSize(s []byte) (uint32 , int) {
    if len(s) == 0 {
        return 0, 0
    }
    size, n := uint32(s[0]), 1
    // 如果第一个字节的最高位为1 则将用四个字节表示
    if size & (1<<7) != 0 {
        if len(s) < 4 {
            return 0,0
        }
        n = 4
        size = binary.BigEndian.Uint32(s)
    }
    return size, n
}

最终实验

package main

import (
    "fastcgi"
    "fmt"
    "io/ioutil"
    "net/http"
)

func main()  {
    http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
        bytes, _ := ioutil.ReadFile("./index.html")
        writer.Write(bytes)
    })
    server := fastcgi.NewFcgiServer("127.0.0.1:9000", nil)
    err := server.Start()
    if err != nil {
        fmt.Println("fcgiserver start error")
    }
}

postman 实验

附结构体关系

结构体关系

文档更新时间: 2021-01-22 03:12   作者:周国强