您的位置 首页 > 腾讯云社区

聊聊debezium的SnapshotChangeRecordEmitter---codecraft

本文主要研究一下debezium的SnapshotChangeRecordEmitter

SnapshotChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java

public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { private final Object[] row; public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) { super(offset, clock); this.row = row; } @Override protected Operation getOperation() { return Operation.READ; } @Override protected Object[] getOldColumnValues() { throw new UnsupportedOperationException("Can't get old row values for READ record"); } @Override protected Object[] getNewColumnValues() { return row; } }SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回rowRelationalChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java

public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> { protected final Logger logger = LoggerFactory.getLogger(getClass()); public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) { super(offsetContext, clock); } @Override public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException { TableSchema tableSchema = (TableSchema) schema; Operation operation = getOperation(); switch (operation) { case CREATE: emitCreateRecord(receiver, tableSchema); break; case READ: emitReadRecord(receiver, tableSchema); break; case UPDATE: emitUpdateRecord(receiver, tableSchema); break; case DELETE: emitDeleteRecord(receiver, tableSchema); break; default: throw new IllegalArgumentException("Unsupported operation: " + operation); } } @Override protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset()); } @Override protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset()); } @Override protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); Object newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) { logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } // some configurations does not provide old values in case of updates // in this case we handle all updates as regular ones if (oldKey == null || Objects.equals(oldKey, newKey)) { Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset()); } // PK update -> emit as delete and re-insert with new key else { Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset()); envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset()); } } @Override protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) { logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo()); return; } Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset()); } /** * Returns the operation done by the represented change. */ protected abstract Operation getOperation(); /** * Returns the old row state in case of an UPDATE or DELETE. */ protected abstract Object[] getOldColumnValues(); /** * Returns the new row state in case of a CREATE or READ. */ protected abstract Object[] getNewColumnValues(); /** * Whether empty data messages should be ignored. * * @return true if empty data messages coming from data source should be ignored.</br> * Typical use case are PostgreSQL changes without FULL replica identity. */ protected boolean skipEmptyMessages() { return false; } }RelationalChangeRecordEmitter继承了AbstractChangeRecordEmitter,其泛型为TableSchema;其emitChangeRecords方法根据不同的operation执行不同的emit方法;这些emit方法主要是构造key及envelope,然后执行receiver.changeRecordAbstractChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java

public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter { private final OffsetContext offsetContext; private final Clock clock; public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) { this.offsetContext = offsetContext; this.clock = clock; } @Override @SuppressWarnings({ "unchecked" }) public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException { Operation operation = getOperation(); switch (operation) { case CREATE: emitCreateRecord(receiver, (T) schema); break; case READ: emitReadRecord(receiver, (T) schema); break; case UPDATE: emitUpdateRecord(receiver, (T) schema); break; case DELETE: emitDeleteRecord(receiver, (T) schema); break; default: throw new IllegalArgumentException("Unsupported operation: " + operation); } } @Override public OffsetContext getOffset() { return offsetContext; } /** * Returns the clock of the change record(s) emitted. */ public Clock getClock() { return clock; } /** * Returns the operation associated with the change. */ protected abstract Operation getOperation(); /** * Emits change record(s) associated with a snapshot. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with an insert operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with an update operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException; /** * Emits change record(s) associated with a delete operation. * * @param receiver the handler for which the emitted record should be dispatched * @param schema the schema */ protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException; }AbstractChangeRecordEmitter实现了ChangeRecordEmitter接口,其提供了emitChangeRecords方法,封装了针对不同operation的调用,同事定义了emitCreateRecord、emitReadRecord、emitUpdateRecord、emitDeleteRecord方法供子类实现ChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java

public interface ChangeRecordEmitter { /** * Emits the change record(s) corresponding to data change represented by this emitter. */ void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException; /** * Returns the offset of the change record(s) emitted. */ OffsetContext getOffset(); public interface Receiver { void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException; } }ChangeRecordEmitter接口定义了emitChangeRecords、getOffset方法,同时还定义了Receiver接口,该接口定义了changeRecord方法小结

SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

docSnapshotChangeRecordEmitter ---来自腾讯云社区的---codecraft

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: