blob: 214774cbe347aae771ece81756b2b872efa2e83d [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package driver
8
9import (
10 "context"
11
12 "github.com/mongodb/mongo-go-driver/mongo/options"
13 "github.com/mongodb/mongo-go-driver/x/bsonx"
14
15 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
16 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
17 "github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
18 "github.com/mongodb/mongo-go-driver/x/mongo/driver/uuid"
19 "github.com/mongodb/mongo-go-driver/x/network/command"
20 "github.com/mongodb/mongo-go-driver/x/network/description"
21 "github.com/mongodb/mongo-go-driver/x/network/result"
22)
23
24// Insert handles the full cycle dispatch and execution of an insert command against the provided
25// topology.
26func Insert(
27 ctx context.Context,
28 cmd command.Insert,
29 topo *topology.Topology,
30 selector description.ServerSelector,
31 clientID uuid.UUID,
32 pool *session.Pool,
33 retryWrite bool,
34 opts ...*options.InsertManyOptions,
35) (result.Insert, error) {
36
37 ss, err := topo.SelectServer(ctx, selector)
38 if err != nil {
39 return result.Insert{}, err
40 }
41
42 // If no explicit session and deployment supports sessions, start implicit session.
43 if cmd.Session == nil && topo.SupportsSessions() {
44 cmd.Session, err = session.NewClientSession(pool, clientID, session.Implicit)
45 if err != nil {
46 return result.Insert{}, err
47 }
48 defer cmd.Session.EndSession()
49 }
50
51 insertOpts := options.MergeInsertManyOptions(opts...)
52
53 if insertOpts.BypassDocumentValidation != nil && ss.Description().WireVersion.Includes(4) {
54 cmd.Opts = append(cmd.Opts, bsonx.Elem{"bypassDocumentValidation", bsonx.Boolean(*insertOpts.BypassDocumentValidation)})
55 }
56 if insertOpts.Ordered != nil {
57 cmd.Opts = append(cmd.Opts, bsonx.Elem{"ordered", bsonx.Boolean(*insertOpts.Ordered)})
58 }
59
60 // Execute in a single trip if retry writes not supported, or retry not enabled
61 if !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) || !retryWrite {
62 if cmd.Session != nil {
63 cmd.Session.RetryWrite = false // explicitly set to false to prevent encoding transaction number
64 }
65 return insert(ctx, cmd, ss, nil)
66 }
67
68 // TODO figure out best place to put retry write. Command shouldn't have to know about this field.
69 cmd.Session.RetryWrite = retryWrite
70 cmd.Session.IncrementTxnNumber()
71
72 res, originalErr := insert(ctx, cmd, ss, nil)
73
74 // Retry if appropriate
75 if cerr, ok := originalErr.(command.Error); ok && cerr.Retryable() ||
76 res.WriteConcernError != nil && command.IsWriteConcernErrorRetryable(res.WriteConcernError) {
77 ss, err := topo.SelectServer(ctx, selector)
78
79 // Return original error if server selection fails or new server does not support retryable writes
80 if err != nil || !retrySupported(topo, ss.Description(), cmd.Session, cmd.WriteConcern) {
81 return res, originalErr
82 }
83
84 return insert(ctx, cmd, ss, cerr)
85 }
86
87 return res, originalErr
88}
89
90func insert(
91 ctx context.Context,
92 cmd command.Insert,
93 ss *topology.SelectedServer,
94 oldErr error,
95) (result.Insert, error) {
96 desc := ss.Description()
97 conn, err := ss.Connection(ctx)
98 if err != nil {
99 if oldErr != nil {
100 return result.Insert{}, oldErr
101 }
102 return result.Insert{}, err
103 }
104
105 if !writeconcern.AckWrite(cmd.WriteConcern) {
106 go func() {
107 defer func() { _ = recover() }()
108 defer conn.Close()
109
110 _, _ = cmd.RoundTrip(ctx, desc, conn)
111 }()
112
113 return result.Insert{}, command.ErrUnacknowledgedWrite
114 }
115 defer conn.Close()
116
117 return cmd.RoundTrip(ctx, desc, conn)
118}