blob: 5dd7fd26c3ddb1273c6960232fe0cc45e7b603ef [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2017 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kafka
18
19import (
20 "fmt"
21 "strconv"
22)
23
24/*
25#include <stdlib.h>
26#include <librdkafka/rdkafka.h>
27
28static int64_t _c_rdkafka_offset_tail(int64_t rel) {
29 return RD_KAFKA_OFFSET_TAIL(rel);
30}
31*/
32import "C"
33
34// Offset type (int64) with support for canonical names
35type Offset int64
36
37// OffsetBeginning represents the earliest offset (logical)
38const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
39
40// OffsetEnd represents the latest offset (logical)
41const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
42
43// OffsetInvalid represents an invalid/unspecified offset
44const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
45
46// OffsetStored represents a stored offset
47const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
48
49func (o Offset) String() string {
50 switch o {
51 case OffsetBeginning:
52 return "beginning"
53 case OffsetEnd:
54 return "end"
55 case OffsetInvalid:
56 return "unset"
57 case OffsetStored:
58 return "stored"
59 default:
60 return fmt.Sprintf("%d", int64(o))
61 }
62}
63
64// Set offset value, see NewOffset()
65func (o *Offset) Set(offset interface{}) error {
66 n, err := NewOffset(offset)
67
68 if err == nil {
69 *o = n
70 }
71
72 return err
73}
74
75// NewOffset creates a new Offset using the provided logical string, or an
76// absolute int64 offset value.
77// Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
78func NewOffset(offset interface{}) (Offset, error) {
79
80 switch v := offset.(type) {
81 case string:
82 switch v {
83 case "beginning":
84 fallthrough
85 case "earliest":
86 return Offset(OffsetBeginning), nil
87
88 case "end":
89 fallthrough
90 case "latest":
91 return Offset(OffsetEnd), nil
92
93 case "unset":
94 fallthrough
95 case "invalid":
96 return Offset(OffsetInvalid), nil
97
98 case "stored":
99 return Offset(OffsetStored), nil
100
101 default:
102 off, err := strconv.Atoi(v)
103 return Offset(off), err
104 }
105
106 case int:
107 return Offset((int64)(v)), nil
108 case int64:
109 return Offset(v), nil
110 default:
111 return OffsetInvalid, newErrorFromString(ErrInvalidArg,
112 fmt.Sprintf("Invalid offset type: %t", v))
113 }
114}
115
116// OffsetTail returns the logical offset relativeOffset from current end of partition
117func OffsetTail(relativeOffset Offset) Offset {
118 return Offset(C._c_rdkafka_offset_tail(C.int64_t(relativeOffset)))
119}
120
121// offsetsForTimes looks up offsets by timestamp for the given partitions.
122//
123// The returned offset for each partition is the earliest offset whose
124// timestamp is greater than or equal to the given timestamp in the
125// corresponding partition.
126//
127// The timestamps to query are represented as `.Offset` in the `times`
128// argument and the looked up offsets are represented as `.Offset` in the returned
129// `offsets` list.
130//
131// The function will block for at most timeoutMs milliseconds.
132//
133// Duplicate Topic+Partitions are not supported.
134// Per-partition errors may be returned in the `.Error` field.
135func offsetsForTimes(H Handle, times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
136 cparts := newCPartsFromTopicPartitions(times)
137 defer C.rd_kafka_topic_partition_list_destroy(cparts)
138 cerr := C.rd_kafka_offsets_for_times(H.gethandle().rk, cparts, C.int(timeoutMs))
139 if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
140 return nil, newError(cerr)
141 }
142
143 return newTopicPartitionsFromCparts(cparts), nil
144}