Informatica CDC PUBLISHER 1.3

Introducing PWX CDC Publisher:
The PWX CDC Publisher is a new Java-based tool that streams change data that has been captured from a PWX data source to a target Apache Kafka messaging system.

High level Architecture on Informatica PWXCDC:
MicrosoftTeams-image
Architecture For CDC Publisher:

publisger

The PWX CDC Publisher consists of the following command-line utilities:

• PwxCDCPublisher – The main utility for streaming PWX change data to the target messaging system.
• PwxCDCAdmin – The utility for performing administrative functions, such as shutting down the PwxCDCPublisher process.
• PwxCDCInfo – The utility for reporting the color-coded status of the PwxCDCPublisher process and each of its main subprocesses. This utility can also report statistics and state information for each Kafka topic and for diagnostic attributes.
When you start the main PwxCdcPublisher utility from a command prompt, it performs the following processing:
• Retrieves the PWX extraction maps that define the data source objects.
• If you configure filtering criteria, filters the extraction maps.
• Starts an extraction process that reads change data from PWX Logger log files for the source objects.
• When the first change for an extraction map is received, generates an Avro schema.
• Formats the change data into Kafka messages based on the Avro schema.
• Connects to the Kafka target and sends the messages to the target. Consumer applications can then read the data from Kafka.
CDCPublisherPWX.cgf :
############# PWX CDC Publisher Properties #############
# Configuration — This line must exist as the first or second line in a configuration file! —
# This file defines PWX-related properties for the PWX CDC Publisher.
# The following properties are the minimum required to identify the PWX
# source to be queried for change data capture data.
#################### PWX-related Properties ##############
#———————————————————————————————–
# The name assigned to the PWX CAPI_CONNECTION statement in the DBMOVER configuration file. This value is specified in the NAME parameter and depends on the data source type:
# (CAPI_CONNECTION=(NAME=..))
# where is the value that you assign to this parameter.
#———————————————————————————————–
Extract.pwxCapiConnectionName=ORA1CPX
#———————————————————————————————–
# The PWX-generated schema name that is included in the extraction map definition.
# This name is in the format nn instance, where nn is a two-digit value that represents the source type and instance is the instance value from the registration group.
#———————————————————————————————–
Extract.pwxExtractionMapSchemaName=d8eeee
#———————————————————————————————–
# An optional list of source objects (comma separated) that should be captured by the
# PWX CDC Publisher. Objects not in this list will be excluded.
# By default, when no list is provided, all objects are included.
# An asterisk can be used as a wildcard for one or more characters.
# Examples:
# To specify a single extraction map name whose activity is the only one to be captured:
# .MyExtractionMap
# To specify all extraction maps starting with the letter A should be captured.
# .A*
# To specify both.
# .A*,.MyExtractionMap
#———————————————————————————————–
Extract.captureConsumerIncludeNameList=
#———————————————————————————————–
# An optional list of source objects (comma separated) that should be excluded from capture by the PWX CDC Publisher. Objects not in this list will be included.
# By default, when no list is provided, no objects are excluded.
# An asterisk can be used as a wildcard for one or more characters.
# Examples:
# To specify a single extraction map name whose activity is the only one to be excluded from capture:
# .MyExtractionMap
# To specify all extraction maps starting with the letters HR should be excluded from capture.
# .HR*
# To specify both.
# .HR*,.MyExtractionMap
#———————————————————————————————–
#Extract.captureConsumerExcludeNameList=d8eeee.aabbccdd_xxx
Extract.captureConsumerExcludeNameList=
#———————————————————————————————–
# The node name in the PWX NODE statement that will used to contact the PWX
# Listener, as defined in the DBMOVER configuration file.
# (NODE=(..))
# where is the node name that you assign to this parameter.
#———————————————————————————————–
Extract.pwxNodeLocation=EEEE
#———————————————————————————————–
# A user ID with access permissions for the PWX node identified in the Extract.pwxNodeLocation setting.
# Use this parameter only if access to the node is protected.
# Typically, this value matches the CAPTURE_NODE_UID value in the Logger pwxccl.cfg file.
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
###Extract.pwxNodeUserId=
#———————————————————————————————–
# The password for the user ID with access permissions to the PWX node identified in the Extract.pwxNodeLocation setting.
# Use this parameter only if access to the node is protected.
# Typically, this value matches the CAPTURE_NODE_PWD value in the Logger pwxccl.cfg file.
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
###Extract.pwxNodePwd=
#———————————————————————————————–
# An encrypted password for the user ID with access permissions to the PWX node identified in the Extract.pwxNodeLocation setting.
# Use this parameter only if access to the node is protected and an encrypted password created by the PWX Navigator is in use.
# Typically, this value matches the CAPTURE_NODE_EPWD value in the Logger pwxccl.cfg file.
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
###Extract.pwxNodeEPwd=
#———————————————————————————————–
# The node name in the PWX NODE statement that will be used to
# Obtain extraction maps, as defined in the DBMOVER configuration file.
# NODE=(…))
# where is the node name that you assign to this parameter.
# Typically, this value matches the CAPTURE_NODE value in the Logger pwxccl.cfg file,
# If you use remote logging.
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
#Extract.pwxXmapLocation=FFF
#———————————————————————————————–
#A user ID that has the permissions required to access the PWX node that will be used to obtain extraction maps.
# Use this parameter only if access to extraction maps is protected.
# Typically, this value matches the CAPTURE_NODE_UID value in the Logger pwxccl.cfg file.
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
###Extract.pwxXmapUserId=
#———————————————————————————————–
# The password for the user ID with access permissions to the PWX node that will be used to obtain extraction maps.
# Only needed if access to extraction maps is protected.
# Typically matches pwxccl.cfg entry CAPTURE_NODE_PWD
# Remove the leading ### comment characters to use this setting.
#———————————————————————————————–
###Extract.pwxXmapPwd=
#to CAPTURE after image only
#Extract.pwxUpdateImageOption=AI
Extract.extractorMaxWaitFullInputQueueSeconds=0

CDCPublisherKafka.cfg :

################ PWX CDC Publisher Properties ################
# Configuration — This line must exist as the first or second line in a configuration file! —
# This file defines Kafka-related properties for the PWX CDC Publisher.
# The following properties are the minimum required to identify Topics and establish
# Connectivity to Kafka.
#################### Kafka-related Properties ####################
#———————————————————————————————–
# The type of output queue. The only valid value is Kafka.
#———————————————————————————————–
Connector.queueType=kafka
#———————————————————————————————–
# The name of the single Kafka topic to which to send all messages, or the option
# USE_TABLE_NAME. USE_TABLE_NAME uses a separate topic for each source table,
# Using the table name as the topic name.
#———————————————————————————————–
Connector.kafkaTopic=USE_TABLE_NAME
#———————————————————————————————–
# The path and file name for the Kafka producer.properties file that is used to communicate with Kafka
#———————————————————————————————–
Connector.kafkaProducerPropertiesFile=/PWX_HOME/producer.properties
Connector.kafkaTableNamePrefix=xxxx_
Formatter.formatterRawEventBufferSize=12
Formatter.formatterOutputQueueSize=25
Connector.connectorRawEventBufferSize=50
Connector.connectorPendingBufferSize=120
Connector.connectorCheckpointAll=false
Connector.connectorCheckpointCommits=true
Connector.checkpointMessageFrequency=5000
Connector.kafkaProducerGuaranteeDelivery=false

CDCPublisherCommon.cgf :
############## PWX CDC Publisher Properties ##############
# Configuration — This line must exist as the first or second line in a configuration file! —
# This file defines common properties for the PWX CDC Publisher.
# The following properties are part of the minimum required to configure a
# PWX CDC Publisher.
############### Common Properties #################
#————————————————————————————————–
# The host name or address and port number on which the PWX CDC Publisher listens for command-and-control requests. localhost is provided as a default.
#————————————————————————————————–
Common.pwxCDCPublisherPort=0000
Common.pwxCDCPublisherHost=127.0.0.1
#————————————————————————————————–
# The contents of the local PWX configuration file are logged to the PWX CDC Publishers log file on startup. Set configuration Common.logPwxConfigContents=false to disable that logging.
#————————————————————————————————–
Common.logPwxConfigContents=true

CDCPublisherAvro.cfg:

################### PWX CDC Publisher Properties ################
# Configuration — This line must exist as the first or second line in a configuration file! —
# This file defines Avro-related properties for the PWX CDC Publisher.
# The following properties are the minimum required to identify the Avro message structure and encoding settings for Avro.
############# Avro-related Properties ###################
#———————————————————————————————–
# The type of formatter to use to format Kafka messages. The only valid value is Avro
#———————————————————————————————–
Formatter.formatterType=avro
Formatter.formatterAddTimestampColumn=true
Formatter.formatterAddedTimestampColumnFormat=yyyy-MM-dd HH:mm:ss.SSS
#———————————————————————————————–
# The schema format to use for all generated Avro schema.
# Valid values are: avroFlatSchemaFormatV1, avroNestedSchemaFormatV1, avroGenericSchemaFormatV1
#———————————————————————————————–
Formatter.avroSchemaFormat=avroFlatSchemaFormatV1
#———————————————————————————————–
# The Avro-supported encoding type that the formatter uses when serializing the Avro records.
# Valid values are: binary, json, or none
# The value “none” indicates that the Avro record will be serialized without explicit encoding.
#———————————————————————————————–
Formatter.avroEncodingType=binary
Formatter.avroFieldSorting=none
Formatter.avroBinaryAsString=true
#Formatter.formatterAddTimestampColumn=true
#Formatter.formatterAddedTimestampColumnFormat=true
#Formatter.avroIncludeBeforeImage=false
#Formatter.avroIncludeIsPresent=false
#Formatter.avroSchemaPrintDefaultFields=false

PwxCDCPubLog4j.xml:

<?xml version=”1.0″ encoding=”UTF-8″?>
<Configuration name=”ServiceConfig” status=”error”>
<!–
Sample PWX CDC Publisher configuration
Produces output in directory PWXPUB_HOME/EEEE/logs/
Current file is named PwxCdcPublisher.out
Past files zipped with names PwxCdcPublisher.out.yyyy-MM-dd-HH-counter.gz
–>
<Properties>
<Property name=”charset”>UTF-8</Property>
<Property name=”maxFileSize”>10 MB</Property>
<Property name=”oldFilesPattern”>.%d{yyyy-MM-dd-HH}.%i${traceFileExtension}.gz</Property>
<Property name=”pattern”>%d{yyyy-MM-dd HH:mm:ss,SSS} [%-20.35t] %-5level %infamsg%n</Property>
<Property name=”serviceId”>SHOULD_BE_UNIQUE</Property>
<Property name=”traceDir”>/PWX_HOME/logs/</Property>
<Property name=”traceFile”>PwxCdcPublisher</Property>
<Property name=”traceFileExtension”>.out</Property>
</Properties>
<Appenders>
<Console name=”Console” target=”SYSTEM_OUT”>
<PatternLayout>
<Pattern>${pattern}</Pattern>
<Charset>${charset}</Charset>
</PatternLayout>
</Console>
<RollingFile name=”TraceFile” fileName=”${traceDir}${traceFile}${traceFileExtension}” filePattern=”${traceDir}${traceFile}${oldFilesPattern}”>
<PatternLayout>
<Pattern>${pattern}</Pattern>
<Charset>${charset}</Charset>
</PatternLayout>
<DefaultRolloverStrategy max=”999999″/>
<Policies>
<OnStartupTriggeringPolicy minSize=”0″/>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size=”${maxFileSize}”/>
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Logger name=”com.informatica.msglogger” level=”info” additivity=”false” includeLocation=”false”>
<AppenderRef ref=”TraceFile”/>
</Logger>
<!–
Change to level=”all” below to include Trace messages
–>
<Logger name=”trace” level=”info” additivity=”false” includeLocation=”false”>
<AppenderRef ref=”TraceFile”/>
</Logger>
<Logger name=”org.beepcore.beep.transport.tcp.TCPSession” level=”info” additivity=”false” includeLocation=”false”>
<AppenderRef ref=”TraceFile”/>
</Logger>
<Root level=”off”>
<AppenderRef ref=”Console”/>
</Root>
</Loggers>
</Configuration>


Publisher Scripts:
Start the publisher by going to the PWX directory PWX_HOME and execute the command in below format:
nohup sh /PWX_HOME/bin/PwxCDCPublisher.sh instance=/PWX_HOME/pubinstance_eeee > /PWX_HOME/logs/eeee_cdcpublisher.log

• Execute below command will display the publisher running status.
ps -ef|grep pub|grep eeee

• Execute the below command to check the status of publisher.
sh /PWX_HOME/bin/PwxCDCInfo.sh instance=/PWX_HOME/pubinstance_eeee STATUS

• Execute the below command to CLEAR, RESET, REPORT the tables.
sh /PWX_HOME/bin/PwxCDCAdmin.sh CLEAR=FORMAT TABLE=ALL instance=/PWX_HOME/eeee

sh /PWX_HOME/bin/PwxCDCAdmin.sh instance=/PWX_HOME/pubinstance_eeee RESET=FORMAT TABLE=all
sh /PWX_HOME/bin/PwxCDCAdmin.sh instance=/PWX_HOME/pubinstance_eeee REPORT=FORMAT TABLE=all

• The reports for the above commands are generated in the below path:
cd /PWX_HOME/reports/

Limitations:
• When COLDSTART=Y for any logger due to some reason, the CDC publisher should start to use RESTART=FROM_BEGINING command to get the latest token values.
• Since condense files are deleted, CDC Publisher should read it from beginning condense files-> remove checkpoint files under checkpoint directory under CDCPublisher instance folder.

Conclusion:
The PWX publisher can satisfy our business requirements for up-to-the-minute data. PWX publisher can increase the productivity and efficiency of the data process. Please feel free to reach Cittabase for more information. Visit our blogs for more topics  on Informatica .

References:
https://knowledge.informatica.com/