blob: 50596303462721f045d5b65eedc06419221aad50 [file] [log] [blame]
Don Newton379ae252019-04-01 12:17:06 -04001// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package command
8
9import (
10 "context"
11
12 "github.com/mongodb/mongo-go-driver/bson"
13 "github.com/mongodb/mongo-go-driver/mongo/writeconcern"
14 "github.com/mongodb/mongo-go-driver/x/bsonx"
15 "github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
16 "github.com/mongodb/mongo-go-driver/x/network/description"
17 "github.com/mongodb/mongo-go-driver/x/network/result"
18 "github.com/mongodb/mongo-go-driver/x/network/wiremessage"
19)
20
21// this is the amount of reserved buffer space in a message that the
22// driver reserves for command overhead.
23const reservedCommandBufferBytes = 16 * 10 * 10 * 10
24
25// Insert represents the insert command.
26//
27// The insert command inserts a set of documents into the database.
28//
29// Since the Insert command does not return any value other than ok or
30// an error, this type has no Err method.
31type Insert struct {
32 ContinueOnError bool
33 Clock *session.ClusterClock
34 NS Namespace
35 Docs []bsonx.Doc
36 Opts []bsonx.Elem
37 WriteConcern *writeconcern.WriteConcern
38 Session *session.Client
39
40 batches []*WriteBatch
41 result result.Insert
42 err error
43}
44
45// Encode will encode this command into a wire message for the given server description.
46func (i *Insert) Encode(desc description.SelectedServer) ([]wiremessage.WireMessage, error) {
47 err := i.encode(desc)
48 if err != nil {
49 return nil, err
50 }
51
52 return batchesToWireMessage(i.batches, desc)
53}
54
55func (i *Insert) encodeBatch(docs []bsonx.Doc, desc description.SelectedServer) (*WriteBatch, error) {
56 command, err := encodeBatch(docs, i.Opts, InsertCommand, i.NS.Collection)
57 if err != nil {
58 return nil, err
59 }
60
61 for _, opt := range i.Opts {
62 if opt.Key == "ordered" && !opt.Value.Boolean() {
63 i.ContinueOnError = true
64 break
65 }
66 }
67
68 return &WriteBatch{
69 &Write{
70 Clock: i.Clock,
71 DB: i.NS.DB,
72 Command: command,
73 WriteConcern: i.WriteConcern,
74 Session: i.Session,
75 },
76 len(docs),
77 }, nil
78}
79
80func (i *Insert) encode(desc description.SelectedServer) error {
81 batches, err := splitBatches(i.Docs, int(desc.MaxBatchCount), int(desc.MaxDocumentSize))
82 if err != nil {
83 return err
84 }
85
86 for _, docs := range batches {
87 cmd, err := i.encodeBatch(docs, desc)
88 if err != nil {
89 return err
90 }
91
92 i.batches = append(i.batches, cmd)
93 }
94 return nil
95}
96
97// Decode will decode the wire message using the provided server description. Errors during decoding
98// are deferred until either the Result or Err methods are called.
99func (i *Insert) Decode(desc description.SelectedServer, wm wiremessage.WireMessage) *Insert {
100 rdr, err := (&Write{}).Decode(desc, wm).Result()
101 if err != nil {
102 i.err = err
103 return i
104 }
105
106 return i.decode(desc, rdr)
107}
108
109func (i *Insert) decode(desc description.SelectedServer, rdr bson.Raw) *Insert {
110 i.err = bson.Unmarshal(rdr, &i.result)
111 return i
112}
113
114// Result returns the result of a decoded wire message and server description.
115func (i *Insert) Result() (result.Insert, error) {
116 if i.err != nil {
117 return result.Insert{}, i.err
118 }
119 return i.result, nil
120}
121
122// Err returns the error set on this command.
123func (i *Insert) Err() error { return i.err }
124
125// RoundTrip handles the execution of this command using the provided wiremessage.ReadWriter.
126//func (i *Insert) RoundTrip(ctx context.Context, desc description.SelectedServer, rw wiremessage.ReadWriter) (result.Insert, error) {
127func (i *Insert) RoundTrip(
128 ctx context.Context,
129 desc description.SelectedServer,
130 rw wiremessage.ReadWriter,
131) (result.Insert, error) {
132 if i.batches == nil {
133 err := i.encode(desc)
134 if err != nil {
135 return result.Insert{}, err
136 }
137 }
138
139 r, batches, err := roundTripBatches(
140 ctx, desc, rw,
141 i.batches,
142 i.ContinueOnError,
143 i.Session,
144 InsertCommand,
145 )
146
147 // if there are leftover batches, save them for retry
148 if batches != nil {
149 i.batches = batches
150 }
151
152 if err != nil {
153 return result.Insert{}, err
154 }
155
156 res := r.(result.Insert)
157 return res, nil
158}