1717
1818import com .fasterxml .jackson .core .JsonProcessingException ;
1919import com .github .jcustenborder .kafka .connect .utils .VersionUtil ;
20- import com .github .jcustenborder .kafka .connect .utils .data .SinkOffsetState ;
21- import com .github .jcustenborder .kafka .connect .utils .data .TopicPartitionCounter ;
2220import com .github .jcustenborder .kafka .connect .utils .jackson .ObjectMapperFactory ;
2321import com .google .common .base .Charsets ;
2422import io .lettuce .core .KeyValue ;
2523import io .lettuce .core .RedisFuture ;
24+ import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
2625import org .apache .kafka .common .TopicPartition ;
2726import org .apache .kafka .connect .errors .DataException ;
2827import org .apache .kafka .connect .errors .RetriableException ;
@@ -146,8 +145,6 @@ public void put(Collection<SinkRecord> records) {
146145
147146 SinkOperation operation = SinkOperation .NONE ;
148147
149- TopicPartitionCounter counter = new TopicPartitionCounter ();
150-
151148 for (SinkRecord record : records ) {
152149 log .trace ("put() - Processing record " + formatLocation (record ));
153150 if (null == record .key ()) {
@@ -182,7 +179,6 @@ public void put(Collection<SinkRecord> records) {
182179 operations .add (operation );
183180 }
184181 operation .add (key , value );
185- counter .increment (record .topic (), record .kafkaPartition (), record .kafkaOffset ());
186182 }
187183
188184 log .debug (
@@ -191,33 +187,50 @@ public void put(Collection<SinkRecord> records) {
191187 records .size ()
192188 );
193189
194- final List <SinkOffsetState > offsetData = counter .offsetStates ();
195- if (!offsetData .isEmpty ()) {
196- operation = SinkOperation .create (SinkOperation .Type .SET , this .config , offsetData .size ());
197- operations .add (operation );
198- for (SinkOffsetState e : offsetData ) {
199- final byte [] key = String .format ("__kafka.offset.%s.%s" , e .topic (), e .partition ()).getBytes (Charsets .UTF_8 );
200- final byte [] value ;
201- try {
202- value = ObjectMapperFactory .INSTANCE .writeValueAsBytes (e );
203- } catch (JsonProcessingException e1 ) {
204- throw new DataException (e1 );
205- }
206- operation .add (key , value );
207- log .trace ("put() - Setting offset: {}" , e );
208- }
209- }
210-
211190 for (SinkOperation op : operations ) {
212191 log .debug ("put() - Executing {} operation with {} values" , op .type , op .size ());
213192 try {
214193 op .execute (this .session .asyncCommands ());
215194 } catch (InterruptedException e ) {
195+ log .warn ("Exception thrown while executing operation" , e );
216196 throw new RetriableException (e );
217197 }
218198 }
219199 }
220200
201+ @ Override
202+ public void flush (Map <TopicPartition , OffsetAndMetadata > currentOffsets ) {
203+ SinkOperation operation = SinkOperation .create (SinkOperation .Type .SET , this .config , currentOffsets .size ());
204+
205+ List <SinkOffsetState > states = currentOffsets
206+ .entrySet ().stream ()
207+ .map (e -> ImmutableSinkOffsetState .builder ()
208+ .topic (e .getKey ().topic ())
209+ .partition (e .getKey ().partition ())
210+ .offset (e .getValue ().offset ())
211+ .build ()
212+ ).collect (Collectors .toList ());
213+
214+ for (SinkOffsetState e : states ) {
215+ final byte [] key = String .format ("__kafka.offset.%s.%s" , e .topic (), e .partition ()).getBytes (Charsets .UTF_8 );
216+ final byte [] value ;
217+ try {
218+ value = ObjectMapperFactory .INSTANCE .writeValueAsBytes (e );
219+ } catch (JsonProcessingException e1 ) {
220+ throw new DataException (e1 );
221+ }
222+ operation .add (key , value );
223+ log .trace ("put() - Setting offset: {}" , e );
224+ }
225+
226+ try {
227+ operation .execute (this .session .asyncCommands ());
228+ } catch (InterruptedException e ) {
229+ log .warn ("Exception thrown while executing operation" , e );
230+ throw new RetriableException (e );
231+ }
232+ }
233+
221234 private static String redisOffsetKey (TopicPartition topicPartition ) {
222235 return String .format ("__kafka.offset.%s.%s" , topicPartition .topic (), topicPartition .partition ());
223236 }
0 commit comments