blob: d59f5b5d1e5ed32dc9ef8e0e47e106a2ea8d9859 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package topology
8
9import (
10 "context"
11 "net"
12
13 "strings"
14
15 "github.com/mongodb/mongo-go-driver/x/network/command"
16 "github.com/mongodb/mongo-go-driver/x/network/connection"
17 "github.com/mongodb/mongo-go-driver/x/network/description"
18 "github.com/mongodb/mongo-go-driver/x/network/wiremessage"
19)
20
21// sconn is a wrapper around a connection.Connection. This type is returned by
22// a Server so that it can track network errors and when a non-timeout network
23// error is returned, the pool on the server can be cleared.
24type sconn struct {
25 connection.Connection
26 s *Server
27 id uint64
28}
29
30var notMasterCodes = []int32{10107, 13435}
31var recoveringCodes = []int32{11600, 11602, 13436, 189, 91}
32
33func (sc *sconn) ReadWireMessage(ctx context.Context) (wiremessage.WireMessage, error) {
34 wm, err := sc.Connection.ReadWireMessage(ctx)
35 if err != nil {
36 sc.processErr(err)
37 } else {
38 e := command.DecodeError(wm)
39 sc.processErr(e)
40 }
41 return wm, err
42}
43
44func (sc *sconn) WriteWireMessage(ctx context.Context, wm wiremessage.WireMessage) error {
45 err := sc.Connection.WriteWireMessage(ctx, wm)
46 sc.processErr(err)
47 return err
48}
49
50func (sc *sconn) processErr(err error) {
51 // TODO(GODRIVER-524) handle the rest of sdam error handling
52 // Invalidate server description if not master or node recovering error occurs
53 if cerr, ok := err.(command.Error); ok && (isRecoveringError(cerr) || isNotMasterError(cerr)) {
54 desc := sc.s.Description()
55 desc.Kind = description.Unknown
56 desc.LastError = err
57 // updates description to unknown
58 sc.s.updateDescription(desc, false)
59 }
60
61 ne, ok := err.(connection.NetworkError)
62 if !ok {
63 return
64 }
65
66 if netErr, ok := ne.Wrapped.(net.Error); ok && netErr.Timeout() {
67 return
68 }
69 if ne.Wrapped == context.Canceled || ne.Wrapped == context.DeadlineExceeded {
70 return
71 }
72
73 desc := sc.s.Description()
74 desc.Kind = description.Unknown
75 desc.LastError = err
76 // updates description to unknown
77 sc.s.updateDescription(desc, false)
78}
79
80func isRecoveringError(err command.Error) bool {
81 for _, c := range recoveringCodes {
82 if c == err.Code {
83 return true
84 }
85 }
86 return strings.Contains(err.Error(), "node is recovering")
87}
88
89func isNotMasterError(err command.Error) bool {
90 for _, c := range notMasterCodes {
91 if c == err.Code {
92 return true
93 }
94 }
95 return strings.Contains(err.Error(), "not master")
96}